pabloem commented on a change in pull request #14531: URL: https://github.com/apache/beam/pull/14531#discussion_r613646369
########## File path: examples/twitter-beam-java/src/main/java/ReadFromTweeterDoFn.java ########## @@ -0,0 +1,76 @@ +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.joda.time.Duration; +import org.joda.time.Instant; +import twitter4j.Status; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; + [email protected] +final class ReadFromTweeterDoFn extends DoFn<String, String> { + + static class OffsetTracker extends RestrictionTracker<OffsetRange, Long> implements Serializable{ + private OffsetRange restriction; + + OffsetTracker(OffsetRange holder) { + this.restriction = holder; + } + + @Override + public boolean tryClaim(Long position) { + System.out.println("-------------- Claiming " + position + " used to have: " + restriction); + long fetchedRecords = + this.restriction == null ? 0 : this.restriction.getTo() + 1; + this.restriction = new OffsetRange(0, fetchedRecords); + return true; + } + + @Override + public OffsetRange currentRestriction() { + return restriction; + } + + @Override + public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { + System.out.println("-------------- Trying to split: fractionOfRemainder=" + fractionOfRemainder); Review comment: remove or log statement? : ) ########## File path: examples/twitter-beam-java/src/main/java/TweeterStream.java ########## @@ -0,0 +1,35 @@ +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.*; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +import java.util.Collections; + +public class TweeterStream { + + public static void main(String[] args) { + Pipeline pipeline = Pipeline.create(); + PCollection<String> tweetStream = pipeline.apply(Create.of(Collections.singletonList(""))).apply(ParDo.of(new ReadFromTweeterDoFn())) Review comment: For sources it's usually more convenient to provide an interface e.g. `TwitterIO` that takes a configuration. Perhaps something like this: For providing the configuration: ``` pipeline.apply(TwitterIO.readTweets() .withUserToken(...) .with...()) ``` And for using a properties file, perhaps ``` pipeline.apply(TwitterIO.withConfigFile('twitter4j.properties')) ``` What do you think? You would override PTransform, and in the `expand` transform you would do the whole `pipeline.apply(Create.of(Collections.singletonList(""))).apply(ParDo.of(new ReadFromTweeterDoFn()))`, so that the Create and the ReadFrom.... are just implementation details ########## File path: examples/twitter-beam-java/src/main/java/ReadFromTweeterDoFn.java ########## @@ -0,0 +1,76 @@ +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.joda.time.Duration; +import org.joda.time.Instant; +import twitter4j.Status; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; + [email protected] +final class ReadFromTweeterDoFn extends DoFn<String, String> { + + static class OffsetTracker extends RestrictionTracker<OffsetRange, Long> implements Serializable{ + private OffsetRange restriction; + + OffsetTracker(OffsetRange holder) { + this.restriction = holder; + } + + @Override + public boolean tryClaim(Long position) { + System.out.println("-------------- Claiming " + position + " used to have: " + restriction); Review comment: maybe remove this line or make it a LOG statement? ########## File path: examples/twitter-beam-java/src/main/java/ReadFromTweeterDoFn.java ########## @@ -0,0 +1,76 @@ +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.joda.time.Duration; +import org.joda.time.Instant; +import twitter4j.Status; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; + [email protected] +final class ReadFromTweeterDoFn extends DoFn<String, String> { Review comment: Perhaps we can add a parameter for a maximum of tweets to read - if it's not set, then read forever - if it's set, then read N tweets. Thoughts? Perhaps we could also add a parameter to read tweets until a certain date or for a certain duration. Thoughts? ########## File path: examples/twitter-beam-java/src/main/java/ReadFromTweeterDoFn.java ########## @@ -0,0 +1,76 @@ +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.joda.time.Duration; +import org.joda.time.Instant; +import twitter4j.Status; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; + [email protected] +final class ReadFromTweeterDoFn extends DoFn<String, String> { Review comment: This SDF is missing a watermark estimator. Could you add that too, please? Here's some info about how to add watermark estimation: https://beam.apache.org/documentation/programming-guide/#sdf-basics -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
