[
https://issues.apache.org/jira/browse/BEAM-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Luke Cwik resolved BEAM-3210.
-----------------------------
Resolution: Not A Problem
Assignee: Luke Cwik (was: Thomas Groh)
Fix Version/s: (was: 2.1.0)
Not applicable
> The problem about the use of waitUntilFinish() in DirectRunner
> --------------------------------------------------------------
>
> Key: BEAM-3210
> URL: https://issues.apache.org/jira/browse/BEAM-3210
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Affects Versions: 2.1.0
> Environment: Ubuntn 14.04.3 LTS
> JDK 1.8
> Beam 2.1.0
> Maven 3.5.0
> Reporter: Rick Lin
> Assignee: Luke Cwik
> Fix For: Not applicable
>
>
> Dear sir,
> The description of waitUntilFinish() is "waits until the pipeline finishes
> and returns the final status."
> In my project, a static variable is used to record a PCollection context,
> where the static variable is a data list type.
> For this, I considered the "p.run().waitUntilFinish()" to wait until the
> pipeline finishes to avoid the loss of record in the data list.
> Unfortunately, there is a problem that the data list{color:#d04437}
> *sometimes* {color}may record the "null" value instead of the realistic value
> In order to clearly explain, i provide my java code in the following.
> {color:#14892c}"import java.io.IOException;
> import java.util.ArrayList;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.Mean;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
> import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
> public class BeamTestStatic extends Thread {
> public static ArrayList<Double> myList = new ArrayList<Double>();
> public static class StaticTest extends DoFn<Double, Void> {
> @ProcessElement
> public void test(ProcessContext c) {
> myList.add(c.element());
> }
> }
> public static void main(String[] args) throws IOException {
> StaticTest testa=new StaticTest();
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> PCollection<Double> data=p.apply("Rawdata",
> Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,));
> PCollection<Void> listtest= data.apply(ParDo.of(testa));
> p.run().waitUntilFinish();
> System.out.println("mylist_size_a="+myList.size());
>
> for (int i = 0; i < myList.size(); i++) {
> System.out.println("mylist_data="+myList.get(i));
> }
> "{color}
> In addition, the result of my code is:
> {color:#205081}"mylist_size_a=10
> mylist_data=null
> mylist_data=4.0
> mylist_data=5.0
> mylist_data=9.0
> mylist_data=6.0
> mylist_data=1.0
> mylist_data=7.0
> mylist_data=8.0
> mylist_data=10.0
> mylist_data=3.0"{color}
> If you have any further information, I am glad to be informed.
> Thanks
> Rick
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)