[
https://issues.apache.org/jira/browse/BAHIR-116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996526#comment-15996526
]
ASF GitHub Bot commented on BAHIR-116:
--------------------------------------
Github user bchen-talend commented on a diff in the pull request:
https://github.com/apache/bahir/pull/42#discussion_r114743438
--- Diff:
streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming.pubsub
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
+import com.google.api.client.json.jackson2.JacksonFactory
+import com.google.api.services.pubsub.Pubsub.Builder
+import com.google.api.services.pubsub.model.PublishRequest
+import com.google.api.services.pubsub.model.PubsubMessage
+import com.google.cloud.hadoop.util.RetryHttpInitializer
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.pubsub.ConnectionUtils
+import org.apache.spark.streaming.pubsub.PubsubTestUtils
+import org.apache.spark.streaming.pubsub.PubsubUtils
+import org.apache.spark.streaming.pubsub.SparkGCPCredentials
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from a Google Cloud Pub/Sub subscription and does
wordcount.
+ * In this example it use application default credentials, so need to use
gcloud
+ * client to generate token file before running example
+ *
+ * Usage: PubsubWordCount <projectId> <subscription>
+ * <projectId> is the name of Google cloud
+ * <subscription> is the subscription to a topic
+ *
+ * Example:
+ * # use gcloud client generate token file
+ * $ gcloud init
+ * $ gcloud auth application-default login
+ *
+ * # run the example
+ * $ bin/run-example \
+ * org.apache.spark.examples.streaming.pubsub.PubsubWordCount
project_1 subscription_1
+ *
+ */
+object PubsubWordCount {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 2) {
+ System.err.println(
+ """
+ |Usage: PubsubWordCount <projectId> <subscription>
+ |
+ | <projectId> is the name of Google cloud
+ | <subscription> is the subscription to a topic
+ |
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ val Seq(projectId, subscription) = args.toSeq
+
+ val sparkConf = new SparkConf().setAppName("PubsubWordCount")
+ val ssc = new StreamingContext(sparkConf, Milliseconds(2000))
+
+ val pubsubStream = PubsubUtils.createStream(ssc, projectId,
subscription,
+ SparkGCPCredentials.builder.build(),
StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ val wordCounts =
+ pubsubStream.map(message => (new String(message.getData()),
1)).reduceByKey(_ + _)
+
+ wordCounts.print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+
+}
+
+/**
--- End diff --
done
> Add Spark streaming connector for Google cloud Pub/Sub
> ------------------------------------------------------
>
> Key: BAHIR-116
> URL: https://issues.apache.org/jira/browse/BAHIR-116
> Project: Bahir
> Issue Type: New Feature
> Components: Spark Streaming Connectors
> Reporter: Bin Chen
>
> A spark streaming connector for [Google
> Pub/Sub|https://cloud.google.com/pubsub/]
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)