This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 82ab144e6d3 [cleanup][io] Remove Pulsar IO Twitter connector (#25080)
82ab144e6d3 is described below

commit 82ab144e6d38b5544ab814b02abf8c4099b94150
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 16 19:27:23 2025 +0200

    [cleanup][io] Remove Pulsar IO Twitter connector (#25080)
---
 deployment/terraform-ansible/deploy-pulsar.yaml    |   1 -
 distribution/io/src/assemble/io.xml                |   1 -
 pulsar-bom/pom.xml                                 |   5 -
 pulsar-functions/worker/pom.xml                    |  25 ++-
 .../rest/api/v3/AbstractFunctionsResourceTest.java |  19 ++-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |   2 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |  50 +++---
 .../java/org/apache/pulsar/io/core/PushSource.java |   2 +-
 pulsar-io/docs/pom.xml                             |   5 -
 pulsar-io/pom.xml                                  |   4 +-
 pulsar-io/twitter/pom.xml                          |  91 -----------
 .../apache/pulsar/io/twitter/TwitterFireHose.java  | 172 ---------------------
 .../pulsar/io/twitter/TwitterFireHoseConfig.java   | 172 ---------------------
 .../apache/pulsar/io/twitter/data/TweetData.java   | 130 ----------------
 .../pulsar/io/twitter/data/TwitterRecord.java      |  65 --------
 .../pulsar/io/twitter/data/package-info.java       |  19 ---
 .../twitter/endpoint/SampleStatusesEndpoint.java   |  41 -----
 .../pulsar/io/twitter/endpoint/package-info.java   |  19 ---
 .../org/apache/pulsar/io/twitter/package-info.java |  19 ---
 .../resources/META-INF/services/pulsar-io.yaml     |  23 ---
 .../io/twitter/TwitterFireHoseConfigTest.java      | 108 -------------
 .../twitter/src/test/resources/sourceConfig.yaml   |  23 ---
 22 files changed, 62 insertions(+), 934 deletions(-)

diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml 
b/deployment/terraform-ansible/deploy-pulsar.yaml
index 698a1c01cd8..fede8391994 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -161,7 +161,6 @@
 #        - rabbitmq
 #        - redis
 #        - solr
-#        - twitter
     - name: Set up broker
       template:
         src: "../templates/broker.conf"
diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index ded0557d5e3..3e4ed50d939 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -45,7 +45,6 @@
      -->
 
     
<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
-    
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/http/target/pulsar-io-http-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml
index 1edb4590327..f004d63363d 100644
--- a/pulsar-bom/pom.xml
+++ b/pulsar-bom/pom.xml
@@ -575,11 +575,6 @@
         <artifactId>pulsar-io-solr</artifactId>
         <version>${project.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.pulsar</groupId>
-        <artifactId>pulsar-io-twitter</artifactId>
-        <version>${project.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.apache.pulsar</groupId>
         <artifactId>pulsar-io</artifactId>
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 02171c8276f..b27b6d73574 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -179,7 +179,15 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-twitter</artifactId>
+      <artifactId>pulsar-io-data-generator</artifactId>
+      <version>${project.version}</version>
+      <type>pom</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-netty</artifactId>
       <version>${project.version}</version>
       <type>pom</type>
       <scope>test</scope>
@@ -245,12 +253,21 @@
                 </artifactItem>
                 <artifactItem>
                   <groupId>${project.groupId}</groupId>
-                  <artifactId>pulsar-io-twitter</artifactId>
+                  <artifactId>pulsar-io-data-generator</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${project.build.directory}</outputDirectory>
+                  <destFileName>pulsar-io-data-generator.nar</destFileName>
+                </artifactItem>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>pulsar-io-netty</artifactId>
                   <version>${project.version}</version>
                   <type>jar</type>
                   <overWrite>true</overWrite>
                   <outputDirectory>${project.build.directory}</outputDirectory>
-                  <destFileName>pulsar-io-twitter.nar</destFileName>
+                  <destFileName>pulsar-io-netty.nar</destFileName>
                 </artifactItem>
                 <artifactItem>
                   <groupId>${project.groupId}</groupId>
@@ -282,10 +299,10 @@
         <configuration>
           <systemPropertyVariables>
             
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
+            
<pulsar-io-netty.nar.path>${project.build.directory}/pulsar-io-netty.nar</pulsar-io-netty.nar.path>
             
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
             
<pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
             
<pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
-            
<pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
             <!-- valid jar file that is not a valid nar file -->
             
<pulsar-io-invalid.nar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-io-invalid.nar.path>
           </systemPropertyVariables>
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
index dd67c39c75f..aafd78cbba7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
@@ -86,7 +86,8 @@ public abstract class AbstractFunctionsResourceTest {
     protected static final String CASSANDRA_STRING_SINK = 
"org.apache.pulsar.io.cassandra.CassandraStringSink";
     protected static final int PARALLELISM = 1;
     private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH = 
"pulsar-io-cassandra.nar.path";
-    private static final String SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH = 
"pulsar-io-twitter.nar.path";
+    private static final String SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH = 
"pulsar-io-data-generator.nar.path";
+    private static final String SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH = 
"pulsar-io-netty.nar.path";
     private static final String SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH = 
"pulsar-io-invalid.nar.path";
     private static final String 
SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
             "pulsar-functions-api-examples.nar.path";
@@ -123,10 +124,16 @@ public abstract class AbstractFunctionsResourceTest {
                         + SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH + " 
system property"));
     }
 
-    public static File getPulsarIOTwitterNar() {
-        return new 
File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH)
-                , "pulsar-io-twitter.nar file location must be specified with "
-                        + SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH + " 
system property"));
+    public static File getPulsarIODataGenNar() {
+        return new 
File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH)
+                , "pulsar-io-data-generator.nar file location must be 
specified with "
+                        + SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH + " 
system property"));
+    }
+
+    public static File getPulsarIONettyNar() {
+        return new 
File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH)
+                , "pulsar-io-netty.nar file location must be specified with "
+                        + SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH + " system 
property"));
     }
 
     public static File getPulsarIOInvalidNar() {
@@ -211,7 +218,7 @@ public abstract class AbstractFunctionsResourceTest {
     }
 
     protected File getDefaultNarFile() {
-        return getPulsarIOTwitterNar();
+        return getPulsarIODataGenNar();
     }
 
     protected void doSetup() throws Exception {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 7c0929a6a9f..2ef2e42d6e7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -226,7 +226,7 @@ public class SinkApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
     public void testRegisterSinkInvalidJarNoSink() throws IOException {
         mockInstanceUtils();
         try {
-            try (FileInputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+            try (FileInputStream inputStream = new 
FileInputStream(getPulsarIONettyNar())) {
                 testRegisterSinkMissingArguments(
                         TENANT,
                         NAMESPACE,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 81170cf569a..f84b3c2e524 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -74,7 +74,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
     private static final String source = "test-source";
     private static final String outputTopic = "test-output-topic";
     private static final String outputSerdeClassName = 
TopicSchema.DEFAULT_SERDE;
-    private static final String TWITTER_FIRE_HOSE = 
"org.apache.pulsar.io.twitter.TwitterFireHose";
+    private static final String DATAGEN_SOURCE = 
"org.apache.pulsar.io.datagenerator.DataGeneratorSource";
     private SourcesImpl resource;
 
     @Override
@@ -109,7 +109,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     null
             );
@@ -130,7 +130,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     null
             );
@@ -151,7 +151,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     null
             );
@@ -206,7 +206,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
 
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Source Package is not provided")
     public void testRegisterSourceMissingPackageDetails() throws IOException {
-        try (InputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+        try (InputStream inputStream = new 
FileInputStream(getPulsarIODataGenNar())) {
             testRegisterSourceMissingArguments(
                     TENANT,
                     NAMESPACE,
@@ -215,7 +215,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     null,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     null
             );
@@ -271,7 +271,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
 
     @Test
     public void testRegisterSourceNoOutputTopic() throws IOException {
-        try (InputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+        try (InputStream inputStream = new 
FileInputStream(getPulsarIODataGenNar())) {
             testRegisterSourceMissingArguments(
                     TENANT,
                     NAMESPACE,
@@ -280,7 +280,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     null,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     null
             );
@@ -302,7 +302,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     null,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     "http://localhost:1234/test";
             );
@@ -386,7 +386,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
     }
 
     private void registerDefaultSource() throws IOException {
-        
registerDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString());
+        
registerDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString());
     }
 
     private void registerDefaultSourceWithPackageUrl(String packageUrl) throws 
IOException {
@@ -482,7 +482,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
         when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
 
         SourceConfig sourceConfig = createDefaultSourceConfig();
-        try (InputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+        try (InputStream inputStream = new 
FileInputStream(getPulsarIODataGenNar())) {
             resource.registerSource(
                     actualTenant,
                     actualNamespace,
@@ -545,7 +545,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     "Tenant is not provided");
         } catch (RestException re) {
@@ -565,7 +565,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     "Namespace is not provided");
         } catch (RestException re) {
@@ -585,7 +585,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     "Source name is not provided");
         } catch (RestException re) {
@@ -654,7 +654,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     -2,
                     "Source parallelism must be a positive number");
         } catch (RestException re) {
@@ -668,7 +668,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
         try {
             mockWorkerUtils();
 
-            try (FileInputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+            try (FileInputStream inputStream = new 
FileInputStream(getPulsarIODataGenNar())) {
                 testUpdateSourceMissingArguments(
                         TENANT,
                         NAMESPACE,
@@ -677,7 +677,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                         mockedFormData,
                         outputTopic,
                         outputSerdeClassName,
-                        TWITTER_FIRE_HOSE,
+                        DATAGEN_SOURCE,
                         PARALLELISM + 1,
                         null);
             }
@@ -691,7 +691,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
     public void testUpdateSourceChangedTopic() throws Exception {
         mockWorkerUtils();
 
-        try (FileInputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+        try (FileInputStream inputStream = new 
FileInputStream(getPulsarIODataGenNar())) {
             testUpdateSourceMissingArguments(
                     TENANT,
                     NAMESPACE,
@@ -700,7 +700,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     "DifferentTopic",
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     PARALLELISM,
                     null);
         }
@@ -756,7 +756,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
         // no changes but set the auth-update flag to true, should not fail
         UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
         updateOptions.setUpdateAuthData(true);
-        try (InputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+        try (InputStream inputStream = new 
FileInputStream(getPulsarIODataGenNar())) {
             resource.updateSource(
                     sourceConfig.getTenant(),
                     sourceConfig.getNamespace(),
@@ -784,7 +784,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     mockedFormData,
                     outputTopic,
                     outputSerdeClassName,
-                    TWITTER_FIRE_HOSE,
+                    DATAGEN_SOURCE,
                     0,
                     "Source parallelism must be a positive number");
         } catch (RestException re) {
@@ -861,7 +861,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
     }
 
     private void updateDefaultSource() throws Exception {
-        
updateDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString());
+        
updateDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString());
     }
 
     private void updateDefaultSourceWithPackageUrl(String packageUrl) throws 
Exception {
@@ -910,7 +910,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
                     
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
             when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
 
-            try (InputStream inputStream = new 
FileInputStream(getPulsarIOTwitterNar())) {
+            try (InputStream inputStream = new 
FileInputStream(getPulsarIODataGenNar())) {
                 resource.updateSource(
                         TENANT,
                         NAMESPACE,
@@ -940,7 +940,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
     public void testUpdateSourceWithUrl() throws Exception {
         Configurator.setRootLevel(Level.DEBUG);
 
-        String filePackageUrl = getPulsarIOTwitterNar().toURI().toString();
+        String filePackageUrl = getPulsarIODataGenNar().toURI().toString();
 
         SourceConfig sourceConfig = createDefaultSourceConfig();
 
@@ -1454,7 +1454,7 @@ public class SourceApiV3ResourceTest extends 
AbstractFunctionsResourceTest {
         sourceConfig.setTenant(TENANT);
         sourceConfig.setNamespace(NAMESPACE);
         sourceConfig.setName(source);
-        sourceConfig.setClassName(TWITTER_FIRE_HOSE);
+        sourceConfig.setClassName(DATAGEN_SOURCE);
         sourceConfig.setParallelism(PARALLELISM);
         sourceConfig.setTopicName(outputTopic);
         sourceConfig.setSerdeClassName(outputSerdeClassName);
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index 6acccdda121..47cc31d5a53 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Record;
 
 /**
  * Pulsar's Push Source interface. PushSource read data from
- * external sources (database changes, twitter firehose, etc)
+ * external sources (database changes, etc)
  * and publish to a Pulsar topic. The reason its called Push is
  * because PushSources get passed a consumer that they
  * invoke whenever they have data to be published to Pulsar.
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 2f0e2c002ab..143643843c6 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -207,11 +207,6 @@
       <artifactId>pulsar-io-solr</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-twitter</artifactId>
-      <version>${project.version}</version>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6e7614e6c19..570b7b6a253 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -49,7 +49,6 @@
         <module>common</module>
         <module>docs</module>
         <module>aws</module>
-        <module>twitter</module>
         <module>cassandra</module>
         <module>aerospike</module>
         <module>http</module>
@@ -87,7 +86,6 @@
         <module>batch-data-generator</module>
         <module>common</module>
         <module>aws</module>
-        <module>twitter</module>
         <module>cassandra</module>
         <module>aerospike</module>
         <module>http</module>
@@ -139,9 +137,9 @@
         <module>batch-discovery-triggerers</module>
         <module>batch-data-generator</module>
         <module>common</module>
-        <module>twitter</module>
         <module>cassandra</module>
         <module>data-generator</module>
+        <module>netty</module>
       </modules>
     </profile>
   </profiles>
diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml
deleted file mode 100644
index 229e551f2e0..00000000000
--- a/pulsar-io/twitter/pom.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-io</artifactId>
-    <version>4.2.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-io-twitter</artifactId>
-  <name>Pulsar IO :: Twitter</name>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-core</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.dataformat</groupId>
-      <artifactId>jackson-dataformat-yaml</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>hbc-core</artifactId>
-      <version>${hbc-core.version}</version>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-collections4</artifactId>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-    </dependency>
-    
-    <dependency>
-       <groupId>${project.groupId}</groupId>
-       <artifactId>pulsar-io-common</artifactId>
-       <version>${project.version}</version>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-deploy-plugin</artifactId>
-        <configuration>
-          <skip>${skipDeployConnector}</skip>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-nar-maven-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
deleted file mode 100644
index 30950ffefe2..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.common.DelimitedStreamReader;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pulsar.io.common.IOConfigUtils;
-import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.apache.pulsar.io.twitter.data.TweetData;
-import org.apache.pulsar.io.twitter.data.TwitterRecord;
-import org.apache.pulsar.io.twitter.endpoint.SampleStatusesEndpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple Push based Twitter FireHose Source.
- */
-@Connector(
-    name = "twitter",
-    type = IOType.SOURCE,
-    help = "A simple connector moving tweets from Twitter FireHose to Pulsar",
-    configClass = TwitterFireHoseConfig.class
-)
-@Slf4j
-public class TwitterFireHose extends PushSource<TweetData> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TwitterFireHose.class);
-
-    // ----- Runtime fields
-    private Object waitObject;
-
-    private final ObjectMapper mapper = new ObjectMapper().configure(
-            DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
-    @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws IOException {
-        TwitterFireHoseConfig hoseConfig = 
IOConfigUtils.loadWithSecrets(config,
-                TwitterFireHoseConfig.class, sourceContext);
-        hoseConfig.validate();
-        waitObject = new Object();
-        startThread(hoseConfig);
-    }
-
-    @Override
-    public void close() throws Exception {
-        stopThread();
-    }
-
-    private void startThread(TwitterFireHoseConfig config) {
-
-        BasicClient client = new ClientBuilder()
-                .name(config.getClientName())
-                .hosts(config.getClientHosts())
-                .endpoint(getEndpoint(config))
-                .authentication(getAuthentication(config))
-                .processor(new HosebirdMessageProcessor() {
-                    public DelimitedStreamReader reader;
-
-                    @Override
-                    public void setup(InputStream input) {
-                        reader = new DelimitedStreamReader(input, 
Constants.DEFAULT_CHARSET,
-                            config.getClientBufferSize());
-                    }
-
-                    @Override
-                    public boolean process() throws IOException, 
InterruptedException {
-                        String tweetStr = reader.readLine();
-                        try {
-                            TweetData tweet = mapper.readValue(tweetStr, 
TweetData.class);
-                            // We don't really care if the record succeeds or 
not.
-                            // However might be in the future to count failures
-                            // TODO:- Figure out the metrics story for 
connectors
-                            consume(new TwitterRecord(tweet, 
config.getGuestimateTweetTime()));
-                        } catch (Exception e) {
-                            LOG.error("Exception thrown", e);
-                        }
-                        return true;
-                    }
-                })
-                .build();
-
-        Thread runnerThread = new Thread(() -> {
-            LOG.info("Started the Twitter FireHose Runner Thread");
-            client.connect();
-            LOG.info("Twitter Streaming API connection established 
successfully");
-
-            // just wait now
-            try {
-                synchronized (waitObject) {
-                    waitObject.wait();
-                }
-            } catch (Exception e) {
-                LOG.info("Got a exception in waitObject");
-            }
-            LOG.debug("Closing Twitter Streaming API connection");
-            client.stop();
-            LOG.info("Twitter Streaming API connection closed");
-            LOG.info("Twitter FireHose Runner Thread ending");
-        });
-        runnerThread.setName("TwitterFireHoseRunner");
-        runnerThread.start();
-    }
-
-    private void stopThread() {
-        LOG.info("Source closed");
-        synchronized (waitObject) {
-            waitObject.notify();
-        }
-    }
-
-    private Authentication getAuthentication(TwitterFireHoseConfig config) {
-        return new OAuth1(config.getConsumerKey(),
-                config.getConsumerSecret(),
-                config.getToken(),
-                config.getTokenSecret());
-    }
-
-    private StreamingEndpoint getEndpoint(TwitterFireHoseConfig config) {
-        List<Long> followings = config.getFollowings();
-        List<String> terms = config.getTrackTerms();
-
-        if (CollectionUtils.isEmpty(followings) && 
CollectionUtils.isEmpty(terms)) {
-            return new SampleStatusesEndpoint().createEndpoint();
-        } else {
-            StatusesFilterEndpoint hosebirdEndpoint = new 
StatusesFilterEndpoint();
-
-            if (CollectionUtils.isNotEmpty(followings)) {
-               hosebirdEndpoint.followings(followings);
-            }
-
-            if (CollectionUtils.isNotEmpty(terms)) {
-               hosebirdEndpoint.trackTerms(terms);
-            }
-
-            return hosebirdEndpoint;
-        }
-    }
-}
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
deleted file mode 100644
index bb02cdde38b..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.common.collect.Lists;
-import com.twitter.hbc.core.Constants;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import lombok.Data;
-import lombok.experimental.Accessors;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
-/**
- * Configuration object for the Twitter Firehose Connector.
- */
-@Data
-@Accessors(chain = true)
-public class TwitterFireHoseConfig implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        sensitive = true,
-        help = "Your twitter app consumer key. See "
-                + 
"https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
 for details"
-    )
-    private String consumerKey;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        sensitive = true,
-        help = "Your twitter app consumer secret. "
-                + "See 
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
 for details"
-    )
-    private String consumerSecret;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        sensitive = true,
-        help = "Your twitter app token. "
-                + "See 
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
 for details"
-    )
-    private String token;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        sensitive = true,
-        help = "Your twitter app token secret. "
-                + "See 
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
 for details"
-    )
-    private String tokenSecret;
-
-    // Most firehose events have null createdAt time. If this parameter is set 
to true
-    // then we estimate the createdTime of each firehose event to be current 
time.
-    @FieldDoc(
-        required = false,
-        defaultValue = "false",
-        help = "Most firehose events have null createdAt time.If this 
parameter is set to true, "
-                + "the connector estimates the createdTime of each firehose 
event to be current time."
-    )
-    private Boolean guestimateTweetTime = false;
-
-    // ------ Optional property keys
-
-    @FieldDoc(
-        required = false,
-        defaultValue = "pulsario-twitter-source",
-        help = "The Twitter Firehose Client name"
-    )
-    private String clientName = "pulsario-twitter-source";
-
-    @FieldDoc(
-        required = false,
-        defaultValue = Constants.STREAM_HOST,
-        help = "The Twitter Firehose stream hosts that the connector connects 
to"
-    )
-    private String clientHosts = Constants.STREAM_HOST;
-
-    @FieldDoc(
-        required = false,
-        defaultValue = "50000",
-        help = "The Twitter Firehose client buffer size"
-    )
-    private int clientBufferSize = 50000;
-
-    @FieldDoc(
-        required = false,
-        defaultValue = "",
-        help = "A comma separated list of user IDs, indicating the users to 
return statuses for in the stream."
-    )
-    private String followings;
-
-    @FieldDoc(
-        required = false,
-        defaultValue = "",
-        help = "Keywords to track. Phrases of keywords are specified by a 
comma-separated list."
-    )
-    private String terms;
-
-    public static TwitterFireHoseConfig load(String yamlFile) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), 
TwitterFireHoseConfig.class);
-    }
-
-    public static TwitterFireHoseConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(mapper.writeValueAsString(map), 
TwitterFireHoseConfig.class);
-    }
-
-    public void validate() throws IllegalArgumentException {
-        if (getConsumerKey() == null || getConsumerSecret() == null
-             || getToken() == null || getTokenSecret() == null) {
-            throw new IllegalArgumentException("Required property not set.");
-        }
-    }
-
-    public List<Long> getFollowings() {
-        if (StringUtils.isBlank(followings)) {
-            return Collections.emptyList();
-        }
-
-        List<Long> result = new ArrayList<Long> ();
-
-        for (String s: StringUtils.split(followings, ",")) {
-            try {
-                result.add(Long.parseLong(StringUtils.trim(s)));
-            } catch (NumberFormatException nfEx) {
-                // Ignore these
-            }
-        }
-
-        return CollectionUtils.isEmpty(result) ? Collections.emptyList() : 
result;
-    }
-
-    public List<String> getTrackTerms() {
-        if (StringUtils.isBlank(terms)) {
-            return Collections.emptyList();
-        }
-
-        return Lists.newArrayList(StringUtils.split(terms, ","));
-    }
-}
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
deleted file mode 100644
index 0b9a3199fa7..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.data;
-
-import lombok.Data;
-
-/**
- * POJO for Tweet object.
- */
-@Data
-public class TweetData {
-    private String createdAt;
-    private Long id;
-    private String idStr;
-    private String text;
-    private String source;
-    private Boolean truncated;
-    private User user;
-    private RetweetedStatus retweetedStatus;
-    private Boolean isQuoteStatus;
-    private Long quoteCount;
-    private Long replyCount;
-    private Long retweetCount;
-    private Long favoriteCount;
-    private Boolean favorited;
-    private Boolean retweeted;
-    private String filterLevel;
-    private String lang;
-    private String timestampMs;
-    private Delete delete;
-
-    /**
-     * POJO for Twitter User object.
-     */
-    @Data
-    public static class User {
-        private Long id;
-        private String idStr;
-        private String name;
-        private String screenName;
-        private String location;
-        private String description;
-        private String translatorType;
-        private Boolean protectedUser;
-        private Boolean verified;
-        private Long followersCount;
-        private Long friendsCount;
-        private Long listedCount;
-        private Long favouritesCount;
-        private Long statusesCount;
-        private String createdAt;
-        private Boolean geoEnabled;
-        private String lang;
-        private Boolean contributorsEnabled;
-        private Boolean isTranslator;
-        private String profileBackgroundColor;
-        private String profileBackgroundImageUrl;
-        private String profileBackgroundImageUrlHttps;
-        private Boolean profileBackgroundTile;
-        private String profileLinkColor;
-        private String profileSidebarBorderColor;
-        private String profileSidebarFillColor;
-        private String profileTextColor;
-        private Boolean profileUseBackgroundImage;
-        private String profileImageUrl;
-        private String profileImageUrlHttps;
-        private String profileBannerUrl;
-        private Boolean defaultProfile;
-        private Boolean defaultProfileImage;
-    }
-
-    /**
-     * POJO for Re-Tweet object.
-     */
-    @Data
-    public static class RetweetedStatus {
-        private String createdAt;
-        private Long id;
-        private String idStr;
-        private String text;
-        private String source;
-        private Boolean truncated;
-        private User user;
-        private Boolean isQuoteStatus;
-        private Long quoteCount;
-        private Long replyCount;
-        private Long retweetCount;
-        private Long favoriteCount;
-        private Boolean favorited;
-        private Boolean retweeted;
-        private String filterLevel;
-        private String lang;
-    }
-
-    /**
-     * POJO for Tweet Status object.
-     */
-    @Data
-    public static class Status {
-        private Long id;
-        private String idStr;
-        private Long userId;
-        private String userIdStr;
-    }
-
-    /**
-     * POJO for Tweet Delete object.
-     */
-    @Data
-    public static class Delete {
-        private Status status;
-        private String timestampMs;
-    }
-}
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
deleted file mode 100644
index 7292c05548b..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.data;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Optional;
-import org.apache.pulsar.functions.api.Record;
-
-/**
- * Twitter Record object.
- */
-public class TwitterRecord implements Record<TweetData> {
-    private final TweetData tweet;
-    private static final SimpleDateFormat dateFormat = new 
SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy");
-    private final boolean guestimateTweetTime;
-
-    public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) {
-        this.tweet = tweet;
-        this.guestimateTweetTime = guestimateTweetTime;
-    }
-
-    @Override
-    public Optional<String> getKey() {
-        // TODO: Could use user or tweet ID as key here
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<Long> getEventTime() {
-        try {
-            if (tweet.getCreatedAt() != null) {
-                Date d = dateFormat.parse(tweet.getCreatedAt());
-                return Optional.of(d.toInstant().toEpochMilli());
-            } else if (guestimateTweetTime) {
-                return Optional.of(System.currentTimeMillis());
-            } else {
-                return Optional.empty();
-            }
-        } catch (Exception e) {
-            return Optional.empty();
-        }
-    }
-
-    @Override
-    public TweetData getValue() {
-        return tweet;
-    }
-}
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
deleted file mode 100644
index eadf21b6a89..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.data;
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
deleted file mode 100644
index ca942076a16..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.endpoint;
-
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import java.io.Serializable;
-
-/**
- * Required for Twitter Client.
- */
-public class SampleStatusesEndpoint implements Serializable {
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
-
-    public StreamingEndpoint createEndpoint() {
-        // Returns the sample endpoint: Returning a sample from the firehose 
(all tweets)
-        StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
-        endpoint.stallWarnings(false);
-        endpoint.delimited(false);
-        return endpoint;
-    }
-}
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
deleted file mode 100644
index 2bca50c06c1..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.endpoint;
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
deleted file mode 100644
index 23f810a0240..00000000000
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
deleted file mode 100644
index 39d7d974a4e..00000000000
--- a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-name: twitter
-description: Ingest data from Twitter firehose
-sourceClass: org.apache.pulsar.io.twitter.TwitterFireHose
-sourceConfigClass: org.apache.pulsar.io.twitter.TwitterFireHoseConfig
diff --git 
a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java
 
b/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java
deleted file mode 100644
index 6f5f99ce260..00000000000
--- 
a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.testng.annotations.Test;
-
-public class TwitterFireHoseConfigTest {
-
-    private TwitterFireHoseConfig config;
-
-    @Test
-    public final void loadFromYamlFileTest() throws IOException {
-        File yamlFile = getFile("sourceConfig.yaml");
-        config = TwitterFireHoseConfig.load(yamlFile.getAbsolutePath());
-        assertNotNull(config);
-    }
-
-    @Test
-    public final void loadFromMapTest() throws IOException {
-        Map<String, Object> map = new HashMap<> ();
-        map.put("consumerKey", "xxx");
-        map.put("consumerSecret", "xxx");
-        map.put("token", "xxx");
-        map.put("tokenSecret", "xxx");
-
-        config = TwitterFireHoseConfig.load(map);
-
-        assertNotNull(config);
-    }
-
-    @Test
-    public final void validValidateTest() throws IOException {
-        Map<String, Object> map = new HashMap<> ();
-        map.put("consumerKey", "xxx");
-        map.put("consumerSecret", "xxx");
-        map.put("token", "xxx");
-        map.put("tokenSecret", "xxx");
-
-        config = TwitterFireHoseConfig.load(map);
-        config.validate();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "Required property not set.")
-    public final void missingConsumerKeyValidateTest() throws IOException {
-        Map<String, Object> map = new HashMap<> ();
-
-        config = TwitterFireHoseConfig.load(map);
-        config.validate();
-    }
-
-    @Test
-    public final void getFollowingsTest() throws IOException {
-        Map<String, Object> map = new HashMap<> ();
-        map.put("followings", "123, 456, 789");
-        config = TwitterFireHoseConfig.load(map);
-
-        List<Long> followings = config.getFollowings();
-        assertNotNull(followings);
-        assertEquals(followings.size(), 3);
-        assertTrue(followings.contains(123L));
-        assertTrue(followings.contains(456L));
-        assertTrue(followings.contains(789L));
-    }
-
-
-    @Test
-    public final void getTermsTest() throws IOException {
-        Map<String, Object> map = new HashMap<> ();
-        map.put("terms", "mickey, donald, goofy");
-        config = TwitterFireHoseConfig.load(map);
-
-        List<String> terms = config.getTrackTerms();
-        assertNotNull(terms);
-        assertEquals(terms.size(), 3);
-        assertTrue(terms.contains("mickey"));
-    }
-
-    private File getFile(String name) {
-        ClassLoader classLoader = getClass().getClassLoader();
-        return new File(classLoader.getResource(name).getFile());
-    }
-
-}
diff --git a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml 
b/pulsar-io/twitter/src/test/resources/sourceConfig.yaml
deleted file mode 100644
index 9ac5708e37f..00000000000
--- a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-"consumerKey": "",
-"consumerSecret": ""
-}
\ No newline at end of file

Reply via email to