[ https://issues.apache.org/jira/browse/BAHIR-116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994559#comment-15994559 ]
ASF GitHub Bot commented on BAHIR-116: -------------------------------------- Github user amarouni commented on a diff in the pull request: https://github.com/apache/bahir/pull/42#discussion_r114494439 --- Diff: streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.google.api.client.auth.oauth2.Credential +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport +import com.google.api.client.googleapis.json.GoogleJsonResponseException +import com.google.api.client.json.jackson2.JacksonFactory +import com.google.api.services.pubsub.Pubsub.Builder +import com.google.api.services.pubsub.model.{AcknowledgeRequest, PubsubMessage, PullRequest} +import com.google.cloud.hadoop.util.RetryHttpInitializer + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils + +/** + * Input stream that subscribe messages from Google cloud Pub/Sub subscription. + * @param project Google cloud project id + * @param subscription Pub/Sub subscription name + * @param credential Google cloud project credential to access Pub/Sub service + */ +private[streaming] +class PubsubInputDStream( + _ssc: StreamingContext, + val project: String, + val subscription: String, + val credential: SparkGCPCredentials, + val _storageLevel: StorageLevel +) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) { + + override def getReceiver(): Receiver[SparkPubsubMessage] = { + new PubsubReceiver(project, subscription, credential, _storageLevel) + } +} + +/** + * A wrapper class for PubsubMessage's with a custom serialization format. + * + * This is necessary because PubsubMessage uses inner data structures + * which are not serializable. + */ +class SparkPubsubMessage() extends Externalizable { + + private[pubsub] var message = new PubsubMessage + + def getData(): Array[Byte] = message.decodeData() + + def getAttributes(): java.util.Map[String, String] = message.getAttributes + + def getMessageId(): String = message.getMessageId + + def getPublishTime(): String = message.getPublishTime + + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { + message.decodeData() match { + case null => out.writeInt(-1) + case data => + out.writeInt(data.size) + out.write(data) + } + + message.getMessageId match { + case null => out.writeInt(-1) + case id => + val idBuff = Utils.serialize(id) + out.writeInt(idBuff.length) + out.write(idBuff) + } + + message.getPublishTime match { + case null => out.writeInt(-1) + case time => + val publishTimeBuff = Utils.serialize(time) + out.writeInt(publishTimeBuff.length) + out.write(publishTimeBuff) + } + + message.getAttributes match { + case null => out.writeInt(-1) + case attrs => + out.writeInt(attrs.size()) + for ((k, v) <- message.getAttributes.asScala) { + val keyBuff = Utils.serialize(k) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } + } + + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { + in.readInt() match { + case -1 => message.encodeData(null) + case bodyLength => + val data = new Array[Byte](bodyLength) + in.readFully(data) + message.encodeData(data) + } + + in.readInt() match { + case -1 => message.setMessageId(null) + case idLength => + val idBuff = new Array[Byte](idLength) + in.readFully(idBuff) + val id: String = Utils.deserialize(idBuff) + message.setMessageId(id) + } + + in.readInt() match { + case -1 => message.setPublishTime(null) + case publishTimeLength => + val publishTimeBuff = new Array[Byte](publishTimeLength) + in.readFully(publishTimeBuff) + val publishTime: String = Utils.deserialize(publishTimeBuff) + message.setPublishTime(publishTime) + } + + in.readInt() match { + case -1 => message.setAttributes(null) + case numAttributes => + val attributes = new java.util.HashMap[String, String] + for (i <- 0 until numAttributes) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.readFully(keyBuff) + val key: String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.readFully(valBuff) + val value: String = Utils.deserialize(valBuff) + + attributes.put(key, value) + } + message.setAttributes(attributes) + } + } +} + +private [pubsub] +object ConnectionUtils { + val transport = GoogleNetHttpTransport.newTrustedTransport(); + val jacksonFactory = JacksonFactory.getDefaultInstance; + + /** + * Client can retry with these response status + */ + val RESOURCE_EXHAUSTED = 429 + + val CANCELLED = 499 + + val INTERNAL = 500 + + val UNAVAILABLE = 503 + + val DEADLINE_EXCEEDED = 504 + + def retryable(status: Int): Boolean = { + status match { + case RESOURCE_EXHAUSTED | CANCELLED | INTERNAL | UNAVAILABLE | DEADLINE_EXCEEDED => true + case _ => false + } + } +} + + +private[pubsub] +class PubsubReceiver( + project: String, + subscription: String, + credential: SparkGCPCredentials, + storageLevel: StorageLevel) + extends Receiver[SparkPubsubMessage](storageLevel) { + + val APP_NAME = "sparkstreaming-pubsub-receiver" + + val INIT_BACKOFF = 100 // 100ms --- End diff -- @bchen-talend Can this value (and the other values beneath it) be supplied by the user ? > Add Spark streaming connector for Google cloud Pub/Sub > ------------------------------------------------------ > > Key: BAHIR-116 > URL: https://issues.apache.org/jira/browse/BAHIR-116 > Project: Bahir > Issue Type: New Feature > Components: Spark Streaming Connectors > Reporter: Bin Chen > > A spark streaming connector for [Google > Pub/Sub|https://cloud.google.com/pubsub/] -- This message was sent by Atlassian JIRA (v6.3.15#6346)