Just want to double check: how are the non-logging aspects looking here? Do
you have 100 (respectively 90) elements in the created collection? How
about the output of the MapElements?

And from your level of familiarity, I assume you are comfortable with the
fact that on other runners there may be multiple VMs, containers, JVMs,
etc, processing arbitrary fractions of the 100 input elements. I guess the
debugging scenario is assuming that the DirectRunner is not doing anything
like this. (FWIW Brian's result sounds like multiple deserializations of
the same DoFn, so the local variable is re-initialized to 0)

Kenn

On Fri, Sep 10, 2021 at 6:11 AM Daniel Collins <[email protected]> wrote:

> > Why are you setting isBlockOnRun to false?
>
> The above code is part of an integration test being added in
> https://github.com/apache/beam/pull/15418/files, which polls state to
> determine whether all expected messages sent into Pub/Sub Lite have been
> received. There are two flows in this pipeline: one publishing the
> messages, and the other subscribing to them and producing a streaming
> PCollection whose elements are recorded. The problem is not all messages
> are being published into Pub/Sub Lite since the transform after create (the
> one with the log line mentioned above) is never being called with them.
>
> On Thu, Sep 9, 2021 at 11:36 PM Reuven Lax <[email protected]> wrote:
>
>> AFAICT that isStreaming setting isn't referenced anywhere in DirectRunner.
>>
>> Why are you setting isBlockOnRun to false?
>>
>> On Thu, Sep 9, 2021 at 8:19 PM Daniel Collins <[email protected]>
>> wrote:
>>
>>> > In your example, is createdCount a private static member of a class
>>> that contains the function for defining the pipeline?
>>>
>>> Yes, sorry that that was unclear. This is a private static member of the
>>> class. https://issues.apache.org/jira/browse/BEAM-12867 should provide
>>> code making this more clear.
>>>
>>> I think this may have to do with the streaming setting? This is how my
>>> pipeline is initialized:
>>>
>>> @Rule public transient TestPipeline pipeline = TestPipeline.create();
>>>
>>> @Test
>>>   public void testReadWrite() throws Exception {
>>>     pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
>>>
>>> pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
>>>     applyTransform(pipeline);
>>>     pipeline.run():
>>>     /* sleep loop */
>>> }
>>>
>>>
>>>
>>> On Thu, Sep 9, 2021 at 5:57 PM Brian Hulette <[email protected]>
>>> wrote:
>>>
>>>> In your example, is createdCount a private static member of a class
>>>> that contains the function for defining the pipeline? I don't think the
>>>> code is valid as written.
>>>>
>>>> I tried to reproduce what you're seeing, but I'm not able to. If I make
>>>> the AtomicInteger a private static member of the class as described above,
>>>> it works as I'd expect, logging 0-99 (with COUNT=100), mostly in order.
>>>> If I make the AtomicInteger a (non-static) variable in the method I do
>>>> see surprising output: numbers from 0-17, with many duplicates.
>>>>
>>>> Brian
>>>>
>>>>
>>>> On Thu, Sep 9, 2021 at 9:05 AM Daniel Collins <[email protected]>
>>>> wrote:
>>>>
>>>>> This variable is static and an atomic so it is safe to modify in a
>>>>> transform from DirectRunner, and is unrelated to the issue I'm seeing 
>>>>> here.
>>>>>
>>>>> On Thu, Sep 9, 2021 at 11:46 AM Kyle Weaver <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Mutating variables like createdCount that are defined outside of the
>>>>>> Beam pipeline from within a DoFn is unsafe. In this case you could use
>>>>>> Beam's built in Count transform instead.
>>>>>>
>>>>>> On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm running into a weird issue where the following code only runs 50
>>>>>>> iterations. I.E. "Created message index {}" is only printed for numbers
>>>>>>> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
>>>>>>> 1-39. This sounds like a bug in either DirectRunner or Create, does 
>>>>>>> anyone
>>>>>>> have any ideas?
>>>>>>>
>>>>>>> -Daniel
>>>>>>>
>>>>>>> int MESSAGE_COUNT = 100;
>>>>>>> private static AtomicInteger createdCount = new AtomicInteger();
>>>>>>> PCollection<Integer> indexes =
>>>>>>>         pipeline.apply(
>>>>>>>             "createIndexes",
>>>>>>>             Create.of(IntStream.range(0,
>>>>>>> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
>>>>>>> PCollection<PubSubMessage> messages =
>>>>>>>         indexes.apply(
>>>>>>>             "createMessages",
>>>>>>>             MapElements.via(
>>>>>>>                 new SimpleFunction<Integer, PubSubMessage>(
>>>>>>>                     index -> {
>>>>>>>                       System.err.println("Created message index " +
>>>>>>> createdCount.incrementAndGet());
>>>>>>>                         return Message.builder()
>>>>>>>
>>>>>>> .setData(ByteString.copyFromUtf8(index.toString()))
>>>>>>>                             .build()
>>>>>>>                             .toProto(); }) {}));
>>>>>>> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
>>>>>>> messages.apply(
>>>>>>>         "writeMessages",
>>>>>>>
>>>>>>> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>>>>>>>
>>>>>>

Reply via email to