merlimat closed pull request #1520: Pulsar Connect
URL: https://github.com/apache/incubator-pulsar/pull/1520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 30b3d5e5e2..2ea3f5235c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,8 @@ flexible messaging model and an intuitive client 
API.</description>
     <module>pulsar-log4j2-appender</module>
     <!-- functions related modules -->
     <module>pulsar-functions</module>
+    <!-- connector related modules -->
+    <module>pulsar-connect</module>
   </modules>
 
   <issueManagement>
@@ -139,6 +141,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <gson.version>2.8.2</gson.version>
     <sketches.version>0.8.3</sketches.version>
     <jctools.version>2.1.1</jctools.version>
+    <hbc-core.version>2.2.0</hbc-core.version>
 
     <!-- test dependencies -->
     <disruptor.version>3.4.0</disruptor.version>
diff --git a/pulsar-connect/core/pom.xml b/pulsar-connect/core/pom.xml
new file mode 100644
index 0000000000..dea6d626cc
--- /dev/null
+++ b/pulsar-connect/core/pom.xml
@@ -0,0 +1,33 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-connect</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect-core</artifactId>
+  <name>Pulsar Connect :: Connect</name>
+
+</project>
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
new file mode 100644
index 0000000000..65b006bf6f
--- /dev/null
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Pulsar's Push Source interface. PushSource read data from
+ * external sources(database changes, twitter firehose, etc)
+ * and publish to a Pulsar topic. The reason its called Push is
+ * because PushSources get passed a consumption Function that they
+ * invoke whenever they have data to be published to Pulsar.
+ * The lifcycle of a PushSource is to open it passing any config needed
+ * by it to initialize(like open network connection, authenticate, etc).
+ * A consumer Function is then to it which is invoked by the source whenever
+ * there is data to be published. Once all data has been read, one can use 
close
+ * at the end of the session to do any cleanup
+ */
+public interface PushSource<T> extends AutoCloseable {
+    /**
+     * Open connector with configuration
+     *
+     * @param config initialization config
+     * @throws Exception IO type exceptions when opening a connector
+     */
+    void open(final Map<String, String> config) throws Exception;
+
+    /**
+     * Attach a consumer function to this Source. This is invoked by the 
implementation
+     * to pass messages whenever there is data to be pushed to Pulsar.
+     * @param consumer
+     */
+    void setConsumer(Function<T, CompletableFuture<Void>> consumer);
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
new file mode 100644
index 0000000000..e22eb0f20b
--- /dev/null
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Pulsar's Sink interface. Sink read data from
+ * a Pulsar topic and write it to external sinks(kv store, database, 
filesystem ,etc)
+ * The lifcycle of a Sink is to open it passing any config needed
+ * by it to initialize(like open network connection, authenticate, etc).
+ * On every message from the designated PulsarTopic, the write method is
+ * invoked which writes the message to the external sink. One can use close
+ * at the end of the session to do any cleanup
+ */
+public interface Sink<T> extends AutoCloseable {
+    /**
+     * Open connector with configuration
+     *
+     * @param config initialization config
+     * @throws Exception IO type exceptions when opening a connector
+     */
+    void open(final Map<String, String> config) throws Exception;
+
+    /**
+     * Attempt to publish a type safe collection of messages
+     *
+     * @param message Object to publish to the sink
+     * @return Completable future fo async publish request
+     */
+    CompletableFuture<Void> write(final T message);
+}
\ No newline at end of file
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
new file mode 100644
index 0000000000..bf1303fa74
--- /dev/null
+++ b/pulsar-connect/pom.xml
@@ -0,0 +1,39 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <packaging>pom</packaging>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect</artifactId>
+  <name>Pulsar Connect :: Parent</name>
+
+  <modules>
+    <module>core</module>
+    <module>twitter</module>
+  </modules>
+
+</project>
diff --git a/pulsar-connect/twitter/pom.xml b/pulsar-connect/twitter/pom.xml
new file mode 100644
index 0000000000..ae9221479e
--- /dev/null
+++ b/pulsar-connect/twitter/pom.xml
@@ -0,0 +1,61 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-connect</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect-twitter</artifactId>
+  <name>Pulsar Connect :: Twitter</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>hbc-core</artifactId>
+      <version>${hbc-core.version}</version>
+    </dependency>
+
+  </dependencies>
+
+</project>
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
new file mode 100644
index 0000000000..9f6ebbe66c
--- /dev/null
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.twitter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import org.apache.pulsar.connect.core.PushSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.common.DelimitedStreamReader;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Simple Push based Twitter FireHose Source
+ */
+public class TwitterFireHose implements PushSource<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TwitterFireHose.class);
+
+    // ----- Fields set by the constructor
+
+    // ----- Runtime fields
+    private Object waitObject;
+    private Function<String, CompletableFuture<Void>> consumeFunction;
+
+    @Override
+    public void open(Map<String, String> config) throws IOException {
+        TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
+        if (hoseConfig.getConsumerKey() == null
+                || hoseConfig.getConsumerSecret() == null
+                || hoseConfig.getToken() != null
+                || hoseConfig.getTokenSecret() == null) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+        waitObject = new Object();
+        startThread(hoseConfig);
+    }
+
+    @Override
+    public void setConsumer(Function<String, CompletableFuture<Void>> 
consumeFunction) {
+        this.consumeFunction = consumeFunction;
+    }
+
+    @Override
+    public void close() throws Exception {
+        stopThread();
+    }
+
+    // ------ Custom endpoints
+
+    /**
+     * Implementing this interface allows users of this source to set a custom 
endpoint.
+     */
+    public interface EndpointInitializer {
+        StreamingEndpoint createEndpoint();
+    }
+
+    /**
+     * Required for Twitter Client
+     */
+    private static class SampleStatusesEndpoint implements 
EndpointInitializer, Serializable {
+        @Override
+        public StreamingEndpoint createEndpoint() {
+            // this default endpoint initializer returns the sample endpoint: 
Returning a sample from the firehose (all tweets)
+            StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+            endpoint.stallWarnings(false);
+            endpoint.delimited(false);
+            return endpoint;
+        }
+    }
+
+    private void startThread(TwitterFireHoseConfig config) {
+        Authentication auth = new OAuth1(config.getConsumerKey(),
+                config.getConsumerSecret(),
+                config.getToken(),
+                config.getTokenSecret());
+
+        BasicClient client = new ClientBuilder()
+                .name(config.getClientName())
+                .hosts(config.getClientHosts())
+                .endpoint(new SampleStatusesEndpoint().createEndpoint())
+                .authentication(auth)
+                .processor(new HosebirdMessageProcessor() {
+                    public DelimitedStreamReader reader;
+
+                    @Override
+                    public void setup(InputStream input) {
+                        reader = new DelimitedStreamReader(input, 
Constants.DEFAULT_CHARSET,
+                            config.getClientBufferSize());
+                    }
+
+                    @Override
+                    public boolean process() throws IOException, 
InterruptedException {
+                        String line = reader.readLine();
+                        try {
+                            // We don't really care if the future succeeds or 
not.
+                            // However might be in the future to count failures
+                            // TODO:- Figure out the metrics story for 
connectors
+                            consumeFunction.apply(line);
+                        } catch (Exception e) {
+                            LOG.error("Exception thrown");
+                        }
+                        return true;
+                    }
+                })
+                .build();
+
+        Thread runnerThread = new Thread(() -> {
+            LOG.info("Started the Twitter FireHose Runner Thread");
+            client.connect();
+            LOG.info("Twitter Streaming API connection established 
successfully");
+
+            // just wait now
+            try {
+                synchronized (waitObject) {
+                    waitObject.wait();
+                }
+            } catch (Exception e) {
+                LOG.info("Got a exception in waitObject");
+            }
+            LOG.debug("Closing Twitter Streaming API connection");
+            client.stop();
+            LOG.info("Twitter Streaming API connection closed");
+            LOG.info("Twitter FireHose Runner Thread ending");
+        });
+        runnerThread.setName("TwitterFireHoseRunner");
+        runnerThread.start();
+    }
+
+    private void stopThread() {
+        LOG.info("Source closed");
+        synchronized (waitObject) {
+            waitObject.notify();
+        }
+    }
+
+}
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
new file mode 100644
index 0000000000..f0614bde78
--- /dev/null
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.twitter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+import com.twitter.hbc.core.Constants;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class TwitterFireHoseConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String consumerKey;
+    private String consumerSecret;
+    private String token;
+    private String tokenSecret;
+
+    // ------ Optional property keys
+
+    private String clientName = "openconnector-twitter-source";
+    private String clientHosts = Constants.STREAM_HOST;
+    private int clientBufferSize = 50000;
+
+    public static TwitterFireHoseConfig load(String yamlFile) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), 
TwitterFireHoseConfig.class);
+    }
+
+    public static TwitterFireHoseConfig load(Map<String, String> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
TwitterFireHoseConfig.class);
+    }
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to