[
https://issues.apache.org/jira/browse/BEAM-3243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327820#comment-16327820
]
ASF GitHub Bot commented on BEAM-3243:
--------------------------------------
jkff closed pull request #4185: BEAM-3243 better error message when there are
conflicting anonymous names
URL: https://github.com/apache/beam/pull/4185
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 5358f7d9e27..d6991323854 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -19,12 +19,19 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.transform;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -493,7 +500,7 @@ OutputT applyTransform(String name, InputT input,
/** Lazily initialized; access via {@link #getCoderRegistry()}. */
@Nullable private CoderRegistry coderRegistry;
- private final List<String> unstableNames = new ArrayList<>();
+ private final Multimap<String, PTransform<?, ?>> instancePerName =
ArrayListMultimap.create();
private final PipelineOptions defaultOptions;
private Pipeline(TransformHierarchy transforms, PipelineOptions options) {
@@ -520,11 +527,8 @@ public String toString() {
String namePrefix = transforms.getCurrent().getFullName();
String uniqueName = uniquifyInternal(namePrefix, name);
- boolean nameIsUnique = uniqueName.equals(buildName(namePrefix, name));
-
- if (!nameIsUnique) {
- unstableNames.add(uniqueName);
- }
+ final String builtName = buildName(namePrefix, name);
+ instancePerName.put(builtName, transform);
LOG.debug("Adding {} to {}", transform, this);
transforms.pushNode(uniqueName, input, transform);
@@ -569,21 +573,29 @@ void applyReplacement(
@VisibleForTesting
void validate(PipelineOptions options) {
this.traverseTopologically(new ValidateVisitor(options));
- if (!unstableNames.isEmpty()) {
+ final Collection<Map.Entry<String, Collection<PTransform<?, ?>>>> errors =
+ Collections2.filter(instancePerName.asMap().entrySet(),
+ Predicates.not(new IsUnique<String, PTransform<?, ?>>()));
+ if (!errors.isEmpty()) {
switch (options.getStableUniqueNames()) {
case OFF:
break;
case WARNING:
LOG.warn(
"The following transforms do not have stable unique names: {}",
- Joiner.on(", ").join(unstableNames));
+ Joiner.on(", ").join(transform(errors, new KeysExtractor())));
break;
- case ERROR:
+ case ERROR: // be very verbose here since it will just fail the
execution
throw new IllegalStateException(
String.format(
"Pipeline update will not be possible"
+ " because the following transforms do not have stable
unique names: %s.",
- Joiner.on(", ").join(unstableNames)));
+ Joiner.on(", ").join(transform(errors, new
KeysExtractor()))) + "\n\n"
+ + "Conflicting instances:\n"
+ + Joiner.on("\n").join(transform(
+ errors, new
UnstableNameToMessage(instancePerName)))
+ + "\n\nYou can fix it adding a name when you call
apply(): "
+ + "pipeline.apply(<name>, <transform>).");
default:
throw new IllegalArgumentException(
"Unrecognized value for stable unique names: " +
options.getStableUniqueNames());
@@ -636,4 +648,42 @@ public void visitPrimitiveTransform(Node node) {
node.getTransform().validate(options);
}
}
+
+ private static class TransformToMessage implements Function<PTransform<?,
?>, String> {
+ @Override
+ public String apply(final PTransform<?, ?> transform) {
+ return " - " + transform;
+ }
+ }
+
+ private static class UnstableNameToMessage implements
+ Function<Map.Entry<String, Collection<PTransform<?, ?>>>, String> {
+ private final Multimap<String, PTransform<?, ?>> instances;
+
+ private UnstableNameToMessage(final Multimap<String, PTransform<?, ?>>
instancePerName) {
+ this.instances = instancePerName;
+ }
+
+ @Override
+ public String apply(final Map.Entry<String, Collection<PTransform<?, ?>>>
input) {
+ final Collection<PTransform<?, ?>> values =
instances.get(input.getKey());
+ return "- name=" + input.getKey() + ":\n"
+ + Joiner.on("\n").join(transform(values, new
TransformToMessage()));
+ }
+ }
+
+ private static class KeysExtractor implements
+ Function<Map.Entry<String, Collection<PTransform<?, ?>>>, String> {
+ @Override
+ public String apply(final Map.Entry<String, Collection<PTransform<?, ?>>>
input) {
+ return input.getKey();
+ }
+ }
+
+ private static class IsUnique<K, V> implements Predicate<Map.Entry<K,
Collection<V>>> {
+ @Override
+ public boolean apply(final Map.Entry<K, Collection<V>> input) {
+ return input != null && input.getValue().size() == 1;
+ }
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 1dc9d4403f7..fc35a891a44 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -270,6 +270,10 @@
String getJobName();
void setJobName(String jobName);
+ @Default.Long(1000)
+ Long getMaxBundleSize();
+ void setMaxBundleSize(Long size);
+
/**
* A {@link DefaultValueFactory} that obtains the class of the {@code
DirectRunner} if it exists
* on the classpath, and throws an exception otherwise.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 2ad84fb0626..211c3a7c4db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -688,6 +688,11 @@ public void populateDisplayData(Builder builder) {
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
return PCollectionViews.toAdditionalInputs(sideInputs);
}
+
+ @Override
+ public String toString() {
+ return fn.toString();
+ }
}
/**
@@ -831,6 +836,11 @@ public TupleTagList getAdditionalOutputTags() {
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
return PCollectionViews.toAdditionalInputs(sideInputs);
}
+
+ @Override
+ public String toString() {
+ return fn.toString();
+ }
}
private static void populateDisplayData(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 57fdd75ef56..af0f34c778e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -51,10 +51,12 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.UserCodeException;
@@ -67,6 +69,8 @@
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
@@ -117,6 +121,63 @@ public PipelineResult run(Pipeline pipeline) {
}
}
+ @Test
+ public void testConflictingNames() {
+ final PipelineOptions options = TestPipeline.testingPipelineOptions();
+ final Pipeline p = Pipeline.create(options);
+
+ // Check pipeline runner correctly catches user errors.
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(new BaseMatcher<String>() { // more readable than a
regex
+ @Override
+ public void describeTo(final Description description) {
+ description.appendText("validates the conflicting instances are "
+ + "listed into the exception message");
+ }
+
+ @Override
+ public boolean matches(final Object o) {
+ /*
+ example value (first 2 lines are a single one):
+
+ Pipeline update will not be possible because the following
transforms do not have stable
+ unique names: ParDo(Anonymous)2.
+
+ Conflicting instances:
+ - name=ParDo(Anonymous):
+ - org.apache.beam.sdk.PipelineTest$3@75d2da2d
+ - org.apache.beam.sdk.PipelineTest$2@4278284b
+
+ You can fix it adding a name when you call apply():
pipeline.apply(<name>, <transform>).
+ */
+ final String sanitized = String.class.cast(o)
+
.replaceAll("\\$[\\p{Alnum}]+@[\\p{Alnum}]+", "\\$x@y");
+ return sanitized.contains(
+ "Conflicting instances:\n"
+ + "- name=ParDo(Anonymous):\n"
+ + " - org.apache.beam.sdk.PipelineTest$x@y\n"
+ + " - org.apache.beam.sdk.PipelineTest$x@y\n\n"
+ + "You can fix it adding a name when you call apply(): "
+ + "pipeline.apply(<name>, <transform>).");
+ }
+ });
+ p.apply(Create.of("a"))
+ // 2 anonymous classes are conflicting
+ .apply(ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
+ public void onElement(final ProcessContext ctx) {
+ ctx.output(ctx.element());
+ }
+ }))
+ .apply(ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
+ public void onElement(final ProcessContext ctx) {
+ // no-op
+ }
+ }));
+ p.run();
+ }
+
@Test
public void testPipelineUserExceptionHandling() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> multiple anonymous DoFn lead to conflicting names
> -------------------------------------------------
>
> Key: BEAM-3243
> URL: https://issues.apache.org/jira/browse/BEAM-3243
> Project: Beam
> Issue Type: Task
> Components: sdk-java-core
> Reporter: Romain Manni-Bucau
> Assignee: Romain Manni-Bucau
> Priority: Major
> Fix For: 2.3.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)