[
https://issues.apache.org/jira/browse/BEAM-5653?focusedWorklogId=154423&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154423
]
ASF GitHub Bot logged work on BEAM-5653:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Oct/18 19:45
Start Date: 15/Oct/18 19:45
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #6649: [BEAM-5653] Fix
overriding coders due to duplicate coderId generation
URL: https://github.com/apache/beam/pull/6649
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index 128034ad88a..92592100245 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -41,7 +41,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -190,27 +189,17 @@ public Node apply(MutableNetwork<Node, Edge> input) {
.setId(idGenerator.get())
.setStateApiServiceDescriptor(stateApiServiceDescriptor);
- // Seed the ProcessBundleDescriptor with the bits from the pipeline.
- // We do _not_ seed the transforms, as a ProcessBundleDescriptor will
execute all transforms
- // so misc. client-side transforms are meaningless.
- processBundleDescriptor
- .putAllCoders(pipeline.getComponents().getCodersMap())
- .putAllPcollections(pipeline.getComponents().getPcollectionsMap())
-
.putAllWindowingStrategies(pipeline.getComponents().getWindowingStrategiesMap());
-
// For intermediate PCollections we fabricate, we make a bogus
WindowingStrategy
// TODO: create a correct windowing strategy, including coders and
environment
// An SdkFunctionSpec is invalid without a working environment reference.
We can revamp that
// when we inline SdkFunctionSpec and FunctionSpec, both slated for
inlining wherever they occur
- SdkComponents sdkComponents = SdkComponents.create();
- // Attempt to use the environment supplied by the pipeline, otherwise
default to use
- // the Java environment.
- try {
- sdkComponents.registerEnvironment(
-
Iterables.getOnlyElement(pipeline.getComponents().getEnvironmentsMap().values()));
- } catch (NoSuchElementException e) {
+ SdkComponents sdkComponents =
SdkComponents.create(pipeline.getComponents());
+
+ // Default to use the Java environment if pipeline doesn't have
environment specified.
+ if (pipeline.getComponents().getEnvironmentsMap().isEmpty()) {
sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
}
+
String fakeWindowingStrategyId = "fakeWindowingStrategy" +
idGenerator.get();
try {
RunnerApi.MessageWithComponents fakeWindowingStrategyProto =
@@ -236,7 +225,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
InstructionOutput instructionOutput = node.getInstructionOutput();
- String coderId = idGenerator.get();
+ String coderId = "generatedCoder" + idGenerator.get();
try (ByteString.Output output = ByteString.newOutput()) {
try {
Coder<?> javaCoder =
@@ -271,7 +260,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
e);
}
- String pcollectionId = idGenerator.get();
+ String pcollectionId = "generatedPcollection" + idGenerator.get();
processBundleDescriptor.putPcollections(
pcollectionId,
RunnerApi.PCollection.newBuilder()
@@ -285,7 +274,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
for (ParallelInstructionNode node :
Iterables.filter(input.nodes(), ParallelInstructionNode.class)) {
ParallelInstruction parallelInstruction = node.getParallelInstruction();
- String ptransformId = idGenerator.get();
+ String ptransformId = "generatedPtransform" + idGenerator.get();
ptransformIdToNameContexts.put(
ptransformId,
NameContext.create(
@@ -394,7 +383,8 @@ public Node apply(MutableNetwork<Node, Edge> input) {
}
for (Node predecessorOutput : input.predecessors(node)) {
- pTransform.putInputs(idGenerator.get(),
nodesToPCollections.get(predecessorOutput));
+ pTransform.putInputs(
+ "generatedInput" + idGenerator.get(),
nodesToPCollections.get(predecessorOutput));
}
for (Edge edge : input.outEdges(node)) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 154423)
Time Spent: 4.5h (was: 4h 20m)
Remaining Estimate: 67.5h (was: 67h 40m)
> Dataflow FnApi Worker overrides some of Coders due to coder ID generation
> collision.
> ------------------------------------------------------------------------------------
>
> Key: BEAM-5653
> URL: https://issues.apache.org/jira/browse/BEAM-5653
> Project: Beam
> Issue Type: Test
> Components: java-fn-execution
> Reporter: Mikhail Gryzykhin
> Assignee: Mikhail Gryzykhin
> Priority: Blocker
> Fix For: 2.8.0
>
> Original Estimate: 72h
> Time Spent: 4.5h
> Remaining Estimate: 67.5h
>
> Due to one of latest refactorings, we got a bug in Java FnApi Worker that it
> overrides Coders in ProcessBundleDescriptor sent to SDK Harness that causes
> jobs to fail.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)