[
https://issues.apache.org/jira/browse/BEAM-2155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890465#comment-16890465
]
Udi Meiri commented on BEAM-2155:
---------------------------------
I've tried my own slightly modified version and it seems to work. Logs show:
{code}
2019-07-22T20:42:22.597Z Combined: K: 1057 V: TEST MapSize: 10 I
2019-07-22T20:42:27.559Z Combined: K: 1058 V: TEST MapSize: 10 I
2019-07-22T20:42:32.699Z Combined: K: 1059 V: TEST MapSize: 10 I
2019-07-22T20:42:37.516Z Combined: K: 1060 V: TEST MapSize: 10 I
2019-07-22T20:42:42.633Z Combined: K: 1061 V: TEST MapSize: 10 I
{code}
Changes:
- CountingInput has been deprecated
(https://issues.apache.org/jira/browse/BEAM-1414) since.
- Removed DataflowPipelineOptions usage.
Code used:
{code}
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleSideInputPipeline {
private static final Logger LOG =
LoggerFactory.getLogger(SimpleSideInputPipeline.class);
public static void main(String[] args) throws IOException {
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline pipeline = Pipeline.create(options);
final PCollectionView<Map<Integer, String>> sideInput =
pipeline
.apply(GenerateSequence.from(0).to(10))
.apply(
"Create KV<Integer, String>",
ParDo.of(
new DoFn<Long, KV<Integer, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().intValue(), "TEST"));
}
}))
.apply(View.asMap());
pipeline
.apply(GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(5)))
.apply(
"Aggregate with side-input",
ParDo.of(
new DoFn<Long, KV<Long, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<Integer, String> map = c.sideInput(sideInput);
// get first segment from map
Object[] values = map.values().toArray();
String firstVal = (String) values[0];
LOG.info(
"Combined: K: "
+ c.element()
+ " V: "
+ firstVal
+ " MapSize: "
+ map.size());
c.output(KV.of(c.element(), firstVal));
}
})
.withSideInputs(sideInput));
pipeline.run();
}
}
{code}
> Investigate problem creating Map PCollection view on Dataflow
> -------------------------------------------------------------
>
> Key: BEAM-2155
> URL: https://issues.apache.org/jira/browse/BEAM-2155
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Kenneth Knowles
> Priority: Major
>
> A user reports a side input seemingly never becoming available:
> http://stackoverflow.com/questions/43755254/dataflow-map-side-input-issue
> The code seems good at first glance, and it works in the direct runner.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)