pabloem commented on a change in pull request #14531:
URL: https://github.com/apache/beam/pull/14531#discussion_r626103212



##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.complete.TwitterStreamGenerator;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import twitter4j.TwitterStream;
+import twitter4j.*;
+import twitter4j.conf.ConfigurationBuilder;
+
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TwitterIO {
+
+    /**
+     * Implementation of read methods.
+     */
+    public static class Read extends PTransform<PBegin, 
PCollection<Read.TwitterConnection>> {
+
+        static BlockingQueue<Status> queue;
+        static String key;
+        static String secret;
+        static String token;
+        static String tokenSecret;
+
+        public Read(Builder builder) {

Review comment:
       let's make the constructor not public so users can't create using that.

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.complete.TwitterStreamGenerator;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import twitter4j.TwitterStream;
+import twitter4j.*;
+import twitter4j.conf.ConfigurationBuilder;
+
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TwitterIO {

Review comment:
       
   can you add Javadoc with an example of how the transform is used?
   
   in Beam, our transforms usually are of this kind:
   
   ```
   p.apply(TwitterIO.read()  // -> returns a Read transform that has `withX` 
methods.
        .withKey(...)     // returns a Read transform with the appropriate 
values set
        .withToken(...)) // etc....
   ```
   
   so rather than use a Builder and setters, I recommend you add `withX` 
methods, that you can implement something like this:
   
   ```
   public Read withX(String x) {
     return this.toBuilder().setX(x).build();
   }
   ```

##########
File path: examples/java/build.gradle
##########
@@ -52,6 +52,10 @@ configurations.sparkRunnerPreCommit {
 }
 
 dependencies {
+  implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', 
version: '2.28.0'
+  implementation group: 'org.apache.beam', name: 'beam-runners-direct-java', 
version: '2.28.0'
+  implementation group: 'org.twitter4j', name: 'twitter4j-stream', version: 
'4.0.7'
+  implementation 'org.projectlombok:lombok:1.18.18'

Review comment:
       is this dependency necessary?

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.complete.TwitterStreamGenerator;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import twitter4j.TwitterStream;
+import twitter4j.*;
+import twitter4j.conf.ConfigurationBuilder;
+
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TwitterIO {
+
+    /**
+     * Implementation of read methods.
+     */
+    public static class Read extends PTransform<PBegin, 
PCollection<Read.TwitterConnection>> {
+
+        static BlockingQueue<Status> queue;
+        static String key;
+        static String secret;
+        static String token;
+        static String tokenSecret;
+
+        public Read(Builder builder) {
+            key = builder.key;
+            secret = builder.secret;
+            token = builder.token;
+            tokenSecret = builder.tokenSecret;
+            queue = new LinkedBlockingQueue<>();
+        }
+
+        @Override
+        public PCollection<TwitterConnection> expand(PBegin input) throws 
IllegalArgumentException {
+            if (key == null || secret == null || token == null || tokenSecret 
== null) {
+                throw new IllegalArgumentException("Please provide key, 
secret, token and token secret");
+            }
+
+            return input.apply(Create.of(TwitterConnection.getInstance()));

Review comment:
       here we also want to apply the ReadFromTwitterDoFn, right? And our 
return would be the contents of a tweet (e.g. `twitter4j.Status` or something 
like that)

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.complete.TwitterStreamGenerator;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import twitter4j.TwitterStream;
+import twitter4j.*;
+import twitter4j.conf.ConfigurationBuilder;
+
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TwitterIO {
+
+    /**
+     * Implementation of read methods.
+     */
+    public static class Read extends PTransform<PBegin, 
PCollection<Read.TwitterConnection>> {
+
+        static BlockingQueue<Status> queue;
+        static String key;
+        static String secret;
+        static String token;
+        static String tokenSecret;
+
+        public Read(Builder builder) {
+            key = builder.key;
+            secret = builder.secret;
+            token = builder.token;
+            tokenSecret = builder.tokenSecret;
+            queue = new LinkedBlockingQueue<>();
+        }
+
+        @Override
+        public PCollection<TwitterConnection> expand(PBegin input) throws 
IllegalArgumentException {
+            if (key == null || secret == null || token == null || tokenSecret 
== null) {
+                throw new IllegalArgumentException("Please provide key, 
secret, token and token secret");
+            }
+
+            return input.apply(Create.of(TwitterConnection.getInstance()));
+        }
+
+        public static class Builder {

Review comment:
       same with the builder. let's make it not public.

##########
File path: examples/java/build.gradle
##########
@@ -52,6 +52,10 @@ configurations.sparkRunnerPreCommit {
 }
 
 dependencies {
+  implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', 
version: '2.28.0'
+  implementation group: 'org.apache.beam', name: 'beam-runners-direct-java', 
version: '2.28.0'

Review comment:
       you can remove these two, as the project already depends on them (see 
lines 62-66)

##########
File path: examples/twitter-beam-java/build.gradle
##########
@@ -0,0 +1,23 @@
+plugins {

Review comment:
       this file is no longer necessary, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to