[ 
https://issues.apache.org/jira/browse/BEAM-2155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890465#comment-16890465
 ] 

Udi Meiri commented on BEAM-2155:
---------------------------------

I've tried my own slightly modified version and it seems to work. Logs show:
{code}
2019-07-22T20:42:22.597Z Combined: K: 1057 V: TEST MapSize: 10 I 
2019-07-22T20:42:27.559Z Combined: K: 1058 V: TEST MapSize: 10 I 
2019-07-22T20:42:32.699Z Combined: K: 1059 V: TEST MapSize: 10 I 
2019-07-22T20:42:37.516Z Combined: K: 1060 V: TEST MapSize: 10 I 
2019-07-22T20:42:42.633Z Combined: K: 1061 V: TEST MapSize: 10 I 
{code}

Changes: 
- CountingInput has been deprecated 
(https://issues.apache.org/jira/browse/BEAM-1414) since.
- Removed DataflowPipelineOptions usage.


Code used:
{code}
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class SimpleSideInputPipeline {

  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleSideInputPipeline.class);

  public static void main(String[] args) throws IOException {
    PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline pipeline = Pipeline.create(options);

    final PCollectionView<Map<Integer, String>> sideInput =
        pipeline
            .apply(GenerateSequence.from(0).to(10))
            .apply(
                "Create KV<Integer, String>",
                ParDo.of(
                    new DoFn<Long, KV<Integer, String>>() {
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        c.output(KV.of(c.element().intValue(), "TEST"));
                      }
                    }))
            .apply(View.asMap());

    pipeline
        .apply(GenerateSequence.from(0).withRate(1, 
Duration.standardSeconds(5)))
        .apply(
            "Aggregate with side-input",
            ParDo.of(
                    new DoFn<Long, KV<Long, String>>() {
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Map<Integer, String> map = c.sideInput(sideInput);

                        // get first segment from map
                        Object[] values = map.values().toArray();
                        String firstVal = (String) values[0];
                        LOG.info(
                            "Combined: K: "
                                + c.element()
                                + " V: "
                                + firstVal
                                + " MapSize: "
                                + map.size());
                        c.output(KV.of(c.element(), firstVal));
                      }
                    })
                .withSideInputs(sideInput));

    pipeline.run();
  }
}
{code}

> Investigate problem creating Map PCollection view on Dataflow
> -------------------------------------------------------------
>
>                 Key: BEAM-2155
>                 URL: https://issues.apache.org/jira/browse/BEAM-2155
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Kenneth Knowles
>            Priority: Major
>
> A user reports a side input seemingly never becoming available: 
> http://stackoverflow.com/questions/43755254/dataflow-map-side-input-issue
> The code seems good at first glance, and it works in the direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to