Thanks for the suggestions Luke. As you know, we are just starting and
should be able to switch to SplittableDoFn, if that's the future of Beam IO
Connectors. The SplittableDoFn page has the design details but it would be
great if we can look into an IO connector built using SplittableDoFn
for reference and to map the design details with actual implementation.
Could you please suggest any such IO for reference.

I will also parallely try your suggestion in advance() and checkpoint mark
coder to close that issue.

Thanks,
Praveen

On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <[email protected]> wrote:

> Since you are working on a new connector I would very strongly
> suggest writing it as a splittable DoFn instead of an UnboundedSource. See
> this thread[1] about additional details and some caveats on the
> recommendation.
>
> 1) You can return false from advance and the runner will execute advance
> at some point in time instead of sleeping. This is also the correct thing
> to do if you hit a throttling error. With a splittable DoFn you can return
> a process continuation allowing you to suggest an amount of time to wait
> before being resumed.
>
> 2) It looks like null was returned as the checkpoint mark coder[2].
>
> 1:
> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E
> 2:
> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223
>
> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan <
> [email protected]> wrote:
>
>> Hello Dev team,
>>
>> We are giving our first shot in writing Beam IO connector for Oracle
>> Streaming Service (OSS). The plan is to first implement it for enterprise
>> use and based on the feedback and stability make it available open source.
>> This is our first attempt in developing a Beam IO connector and so far we
>> have progressed with the help of Beam documentation and other related IOs
>> like KafkaIO, KinesisIO. Thanks to the community on that front.
>>
>> Now OSS *has a read limit of 200ms* so when we read the data as shown
>> below in our UnboundedReaders *advance()* method
>>
>> // Get Messages
>>
>> GetMessagesResponse getResponse =
>> this.streamClient.getMessages(getRequest);
>>
>> We are able to read around five message but after that we are getting 
>> *request
>> throttling error*
>>
>> Request was throttled because requests limit exhausted, next request can
>> be made in 200 ms
>>
>> We tried with an initial solution of introducing *Thread.sleep(200)*
>> before the getMessages to see how it is behaving and this time we are *able
>> to read around 300+ messages*. With the expertise available in this
>> forum, I would like to hear inputs on two points.
>>
>>    1.
>>
>>    How to implement this feature in a proper way rather than just with a
>>    one-line Thread.sleep(200)
>>    2.
>>
>>    After adding Thread.sleep(200) and reading 300+ messages the pipeline
>>    encountered below error. I do not see any implementation specific detail 
>> in
>>    the stack trace. Can I get an insight what this error could be and how to
>>    handle.
>>
>>    java.lang.NullPointerException
>>        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream 
>> (CoderUtils.java:82)
>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray 
>> (CoderUtils.java:66)
>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray 
>> (CoderUtils.java:51)
>>        at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>>        at 
>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader
>>  (UnboundedReadEvaluatorFactory.java:224)
>>        at 
>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement
>>  (UnboundedReadEvaluatorFactory.java:132)
>>        at 
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements 
>> (DirectTransformExecutor.java:160)
>>        at org.apache.beam.runners.direct.DirectTransformExecutor.run 
>> (DirectTransformExecutor.java:124)
>>        at java.util.concurrent.Executors$RunnableAdapter.call 
>> (Executors.java:511)
>>        at java.util.concurrent.FutureTask.run (FutureTask.java:266)
>>        at java.util.concurrent.ThreadPoolExecutor.runWorker 
>> (ThreadPoolExecutor.java:1149)
>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run 
>> (ThreadPoolExecutor.java:624)
>>        at java.lang.Thread.run (Thread.java:748)
>>
>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Reply via email to