Thanks Luke. I will go through them and come back if I have any questions. Regards, Praveen
On Tue, Aug 4, 2020 at 3:55 PM Luke Cwik <[email protected]> wrote: > Take a look at the WatchGrowthFn[1] and also the in-progress Kafka PR[2]. > > 1: > https://github.com/apache/beam/blob/6612b24ada9382706373db547b5606d6e0496b02/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787 > 2: https://github.com/apache/beam/pull/11749 > > On Tue, Aug 4, 2020 at 3:33 PM Praveen K Viswanathan < > [email protected]> wrote: > >> Thanks for the suggestions Luke. As you know, we are just starting and >> should be able to switch to SplittableDoFn, if that's the future of Beam IO >> Connectors. The SplittableDoFn page has the design details but it would be >> great if we can look into an IO connector built using SplittableDoFn >> for reference and to map the design details with actual implementation. >> Could you please suggest any such IO for reference. >> >> I will also parallely try your suggestion in advance() and checkpoint >> mark coder to close that issue. >> >> Thanks, >> Praveen >> >> On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <[email protected]> wrote: >> >>> Since you are working on a new connector I would very strongly >>> suggest writing it as a splittable DoFn instead of an UnboundedSource. See >>> this thread[1] about additional details and some caveats on the >>> recommendation. >>> >>> 1) You can return false from advance and the runner will execute advance >>> at some point in time instead of sleeping. This is also the correct thing >>> to do if you hit a throttling error. With a splittable DoFn you can return >>> a process continuation allowing you to suggest an amount of time to wait >>> before being resumed. >>> >>> 2) It looks like null was returned as the checkpoint mark coder[2]. >>> >>> 1: >>> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E >>> 2: >>> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223 >>> >>> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan < >>> [email protected]> wrote: >>> >>>> Hello Dev team, >>>> >>>> We are giving our first shot in writing Beam IO connector for Oracle >>>> Streaming Service (OSS). The plan is to first implement it for enterprise >>>> use and based on the feedback and stability make it available open source. >>>> This is our first attempt in developing a Beam IO connector and so far we >>>> have progressed with the help of Beam documentation and other related IOs >>>> like KafkaIO, KinesisIO. Thanks to the community on that front. >>>> >>>> Now OSS *has a read limit of 200ms* so when we read the data as shown >>>> below in our UnboundedReaders *advance()* method >>>> >>>> // Get Messages >>>> >>>> GetMessagesResponse getResponse = >>>> this.streamClient.getMessages(getRequest); >>>> >>>> We are able to read around five message but after that we are getting >>>> *request >>>> throttling error* >>>> >>>> Request was throttled because requests limit exhausted, next request >>>> can be made in 200 ms >>>> >>>> We tried with an initial solution of introducing *Thread.sleep(200)* >>>> before the getMessages to see how it is behaving and this time we are *able >>>> to read around 300+ messages*. With the expertise available in this >>>> forum, I would like to hear inputs on two points. >>>> >>>> 1. >>>> >>>> How to implement this feature in a proper way rather than just with >>>> a one-line Thread.sleep(200) >>>> 2. >>>> >>>> After adding Thread.sleep(200) and reading 300+ messages the >>>> pipeline encountered below error. I do not see any implementation >>>> specific >>>> detail in the stack trace. Can I get an insight what this error could be >>>> and how to handle. >>>> >>>> java.lang.NullPointerException >>>> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream >>>> (CoderUtils.java:82) >>>> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>>> (CoderUtils.java:66) >>>> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>>> (CoderUtils.java:51) >>>> at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141) >>>> at >>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader >>>> (UnboundedReadEvaluatorFactory.java:224) >>>> at >>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement >>>> (UnboundedReadEvaluatorFactory.java:132) >>>> at >>>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements >>>> (DirectTransformExecutor.java:160) >>>> at org.apache.beam.runners.direct.DirectTransformExecutor.run >>>> (DirectTransformExecutor.java:124) >>>> at java.util.concurrent.Executors$RunnableAdapter.call >>>> (Executors.java:511) >>>> at java.util.concurrent.FutureTask.run (FutureTask.java:266) >>>> at java.util.concurrent.ThreadPoolExecutor.runWorker >>>> (ThreadPoolExecutor.java:1149) >>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run >>>> (ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run (Thread.java:748) >>>> >>>> >>>> >>>> -- >>>> Thanks, >>>> Praveen K Viswanathan >>>> >>> >> >> -- >> Thanks, >> Praveen K Viswanathan >> > -- Thanks, Praveen K Viswanathan
