http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala deleted file mode 100644 index a477ba6..0000000 --- a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.external.hbase - -import java.io.{File, ObjectInputStream, ObjectOutputStream} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} -import org.apache.hadoop.security.UserGroupInformation - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.task.TaskContext -import io.gearpump.util.{Constants, FileUtils} - -class HBaseSink( - userconfig: UserConfig, tableName: String, @transient var configuration: Configuration) - extends DataSink{ - lazy val connection = HBaseSink.getConnection(userconfig, configuration) - lazy val table = connection.getTable(TableName.valueOf(tableName)) - - override def open(context: TaskContext): Unit = {} - - def this(userconfig: UserConfig, tableName: String) = { - this(userconfig, tableName, HBaseConfiguration.create()) - } - - def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { - insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), - Bytes.toBytes(columnName), Bytes.toBytes(value)) - } - - def insert( - rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte]) - : Unit = { - val put = new Put(rowKey) - put.addColumn(columnGroup, columnName, value) - table.put(put) - } - - def put(msg: Any): Unit = { - msg match { - case seq: Seq[Any] => - seq.foreach(put) - case tuple: (_, _, _, _) => { - tuple._1 match { - case str: String => { - insert(tuple._1.asInstanceOf[String], tuple._2.asInstanceOf[String], - tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String]) - } - case byteArray: Array[Byte@unchecked] => { - insert(tuple._1.asInstanceOf[Array[Byte]], tuple._2.asInstanceOf[Array[Byte]], - tuple._3.asInstanceOf[Array[Byte]], tuple._4.asInstanceOf[Array[Byte]]) - } - case _ => - // Skip - } - } - } - } - - override def write(message: Message): Unit = { - put(message.msg) - } - - def close(): Unit = { - connection.close() - table.close() - } - - private def writeObject(out: ObjectOutputStream): Unit = { - out.defaultWriteObject() - configuration.write(out) - } - - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject() - val clientConf = new Configuration(false) - clientConf.readFields(in) - configuration = HBaseConfiguration.create(clientConf) - } -} - -object HBaseSink { - val HBASESINK = "hbasesink" - val TABLE_NAME = "hbase.table.name" - val COLUMN_FAMILY = "hbase.table.column.family" - val COLUMN_NAME = "hbase.table.column.name" - - def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = { - new HBaseSink(userconfig, tableName) - } - - def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) - : HBaseSink = { - new HBaseSink(userconfig, tableName, configuration) - } - - private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = { - if (UserGroupInformation.isSecurityEnabled) { - val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) - val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE) - if (principal.isEmpty || keytabContent.isEmpty) { - val errorMsg = s"HBase is security enabled, user should provide kerberos principal in " + - s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file " + - s"in ${Constants.GEARPUMP_KEYTAB_FILE}" - throw new Exception(errorMsg) - } - val keytabFile = File.createTempFile("login", ".keytab") - FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get) - keytabFile.setExecutable(false) - keytabFile.setWritable(false) - keytabFile.setReadable(true, true) - - UserGroupInformation.setConfiguration(configuration) - UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath) - keytabFile.delete() - } - ConnectionFactory.createConnection(configuration) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala deleted file mode 100644 index 11a20dc..0000000 --- a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.external.hbase.dsl - -import scala.language.implicitConversions - -import org.apache.hadoop.conf.Configuration - -import io.gearpump.cluster.UserConfig -import io.gearpump.external.hbase.HBaseSink -import io.gearpump.streaming.dsl.Stream -import io.gearpump.streaming.dsl.Stream.Sink - -/** Create a HBase DSL Sink */ -class HBaseDSLSink[T](stream: Stream[T]) { - - def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String) - : Stream[T] = { - stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description) - } - - def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, - parallism: Int, description: String): Stream[T] = { - stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description) - } -} - -object HBaseDSLSink { - implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = { - new HBaseDSLSink[T](stream) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala new file mode 100644 index 0000000..57751a4 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.external.hbase + +import java.io.{File, ObjectInputStream, ObjectOutputStream} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.{Constants, FileUtils} + +class HBaseSink( + userconfig: UserConfig, tableName: String, @transient var configuration: Configuration) + extends DataSink{ + lazy val connection = HBaseSink.getConnection(userconfig, configuration) + lazy val table = connection.getTable(TableName.valueOf(tableName)) + + override def open(context: TaskContext): Unit = {} + + def this(userconfig: UserConfig, tableName: String) = { + this(userconfig, tableName, HBaseConfiguration.create()) + } + + def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { + insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), + Bytes.toBytes(columnName), Bytes.toBytes(value)) + } + + def insert( + rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte]) + : Unit = { + val put = new Put(rowKey) + put.addColumn(columnGroup, columnName, value) + table.put(put) + } + + def put(msg: Any): Unit = { + msg match { + case seq: Seq[Any] => + seq.foreach(put) + case tuple: (_, _, _, _) => { + tuple._1 match { + case str: String => { + insert(tuple._1.asInstanceOf[String], tuple._2.asInstanceOf[String], + tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String]) + } + case byteArray: Array[Byte@unchecked] => { + insert(tuple._1.asInstanceOf[Array[Byte]], tuple._2.asInstanceOf[Array[Byte]], + tuple._3.asInstanceOf[Array[Byte]], tuple._4.asInstanceOf[Array[Byte]]) + } + case _ => + // Skip + } + } + } + } + + override def write(message: Message): Unit = { + put(message.msg) + } + + def close(): Unit = { + connection.close() + table.close() + } + + private def writeObject(out: ObjectOutputStream): Unit = { + out.defaultWriteObject() + configuration.write(out) + } + + private def readObject(in: ObjectInputStream): Unit = { + in.defaultReadObject() + val clientConf = new Configuration(false) + clientConf.readFields(in) + configuration = HBaseConfiguration.create(clientConf) + } +} + +object HBaseSink { + val HBASESINK = "hbasesink" + val TABLE_NAME = "hbase.table.name" + val COLUMN_FAMILY = "hbase.table.column.family" + val COLUMN_NAME = "hbase.table.column.name" + + def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = { + new HBaseSink(userconfig, tableName) + } + + def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) + : HBaseSink = { + new HBaseSink(userconfig, tableName, configuration) + } + + private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = { + if (UserGroupInformation.isSecurityEnabled) { + val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) + val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE) + if (principal.isEmpty || keytabContent.isEmpty) { + val errorMsg = s"HBase is security enabled, user should provide kerberos principal in " + + s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file " + + s"in ${Constants.GEARPUMP_KEYTAB_FILE}" + throw new Exception(errorMsg) + } + val keytabFile = File.createTempFile("login", ".keytab") + FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get) + keytabFile.setExecutable(false) + keytabFile.setWritable(false) + keytabFile.setReadable(true, true) + + UserGroupInformation.setConfiguration(configuration) + UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath) + keytabFile.delete() + } + ConnectionFactory.createConnection(configuration) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala new file mode 100644 index 0000000..2417763 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.external.hbase.dsl + +import scala.language.implicitConversions + +import org.apache.hadoop.conf.Configuration + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.external.hbase.HBaseSink +import org.apache.gearpump.streaming.dsl.Stream +import org.apache.gearpump.streaming.dsl.Stream.Sink + +/** Create a HBase DSL Sink */ +class HBaseDSLSink[T](stream: Stream[T]) { + + def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String) + : Stream[T] = { + stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description) + } + + def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, + parallism: Int, description: String): Stream[T] = { + stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description) + } +} + +object HBaseDSLSink { + implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = { + new HBaseDSLSink[T](stream) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala deleted file mode 100644 index cad6581..0000000 --- a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.external.hbase - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers { - - property("HBaseSink should insert a row successfully") { - - // import Mockito._ - // val htable = Mockito.mock(classOf[HTable]) - // val row = "row" - // val group = "group" - // val name = "name" - // val value = "1.2" - // val put = new Put(Bytes.toBytes(row)) - // put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - // val hbaseSink = HBaseSink(htable) - // hbaseSink.insert(put) - // verify(htable).put(put) - - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala new file mode 100644 index 0000000..24b9646 --- /dev/null +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.external.hbase + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers { + + property("HBaseSink should insert a row successfully") { + + // import Mockito._ + // val htable = Mockito.mock(classOf[HTable]) + // val row = "row" + // val group = "group" + // val name = "name" + // val value = "1.2" + // val put = new Put(Bytes.toBytes(row)) + // put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) + // val hbaseSink = HBaseSink(htable) + // hbaseSink.insert(put) + // verify(htable).put(put) + + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala deleted file mode 100644 index b482c7c..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka - -import java.util.Properties - -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.serialization.ByteArraySerializer - -import io.gearpump.Message -import io.gearpump.streaming.kafka.lib.KafkaUtil -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.task.TaskContext - -/** - * kafka sink connectors that invokes org.apache.kafka.clients.producer.KafkaProducer to send - * messages to kafka queue - * @param getProducer is a function to construct a KafkaProducer - * @param topic is the kafka topic to write to - */ -class KafkaSink private[kafka]( - getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) extends DataSink { - - /** - * @param topic producer topic - * @param properties producer config - */ - def this(topic: String, properties: Properties) = { - this(() => KafkaUtil.createKafkaProducer(properties, - new ByteArraySerializer, new ByteArraySerializer), topic) - } - - /** - * - * creates an empty properties with `bootstrap.servers` set to `bootstrapServers` - * and invokes `KafkaSink(topic, properties)` - * @param topic producer topic - * @param bootstrapServers kafka producer config `bootstrap.servers` - */ - def this(topic: String, bootstrapServers: String) = { - this(topic, KafkaUtil.buildProducerConfig(bootstrapServers)) - } - - // Lazily construct producer since KafkaProducer is not serializable - private lazy val producer = getProducer() - - override def open(context: TaskContext): Unit = {} - - override def write(message: Message): Unit = { - val record = message.msg match { - case (k, v) => - new ProducerRecord[Array[Byte], Array[Byte]](topic, k.asInstanceOf[Array[Byte]], - v.asInstanceOf[Array[Byte]]) - case v => - new ProducerRecord[Array[Byte], Array[Byte]](topic, v.asInstanceOf[Array[Byte]]) - } - producer.send(record) - } - - override def close(): Unit = { - producer.close() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala deleted file mode 100644 index 1544445..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka - -import java.util.Properties -import scala.collection.mutable.ArrayBuffer -import scala.util.{Failure, Success} - -import kafka.common.TopicAndPartition -import org.slf4j.Logger - -import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} -import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil} -import io.gearpump.streaming.source.DefaultTimeStampFilter -import io.gearpump.streaming.task.TaskContext -import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty -import io.gearpump.streaming.transaction.api._ -import io.gearpump.util.LogUtil -import io.gearpump.{Message, TimeStamp} - -object KafkaSource { - private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource]) -} - -/** - * Kafka source connectors that pulls a batch of messages (`kafka.consumer.emit.batch.size`) - * from multiple Kafka TopicAndPartition in a round-robin way. - * - * This is a TimeReplayableSource which is able to replay messages given a start time. - * Each kafka message is tagged with a timestamp by - * [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) mapping - * is stored to a [[io.gearpump.streaming.transaction.api.OffsetStorage]]. On recovery, - * we could retrieve the previously stored offset from the - * [[io.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read from - * there. - * - * kafka message is wrapped into gearpump [[io.gearpump.Message]] and further filtered by a - * [[io.gearpump.streaming.transaction.api.TimeStampFilter]] - * such that obsolete messages are dropped. - * - * @param config kafka source config - * @param offsetStorageFactory factory to build [[OffsetStorage]] - * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes - * @param timestampFilter filters out message based on timestamp - * @param fetchThread fetches messages and puts on a in-memory queue - * @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition - */ -class KafkaSource( - config: KafkaSourceConfig, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder = new DefaultMessageDecoder, - timestampFilter: TimeStampFilter = new DefaultTimeStampFilter, - private var fetchThread: Option[FetchThread] = None, - private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = { - Map.empty[TopicAndPartition, KafkaOffsetManager] - }) extends TimeReplayableSource { - import io.gearpump.streaming.kafka.KafkaSource._ - - private var startTime: Option[TimeStamp] = None - - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param properties kafka consumer config - * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] - * - */ - def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = { - this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory) - } - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param properties kafka consumer config - * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] - * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes - * @param timestampFilter filters out message based on timestamp - */ - def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = { - this(KafkaSourceConfig(properties) - .withConsumerTopics(topics), offsetStorageFactory, - messageDecoder, timestampFilter) - } - - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param zkConnect kafka consumer config `zookeeper.connect` - * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] - */ - def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) = - this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory) - - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param zkConnect kafka consumer config `zookeeper.connect` - * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] - * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes - * @param timestampFilter filters out message based on timestamp - */ - def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter) = { - this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory, - messageDecoder, timestampFilter) - } - - LOG.debug(s"assigned ${offsetManagers.keySet}") - - private[kafka] def setStartTime(startTime: Option[TimeStamp]): Unit = { - this.startTime = startTime - fetchThread.foreach { fetch => - this.startTime.foreach { time => - offsetManagers.foreach { case (tp, offsetManager) => - offsetManager.resolveOffset(time) match { - case Success(offset) => - LOG.debug(s"set start offset to $offset for $tp") - fetch.setStartOffset(tp, offset) - case Failure(StorageEmpty) => - LOG.debug(s"no previous TimeStamp stored") - case Failure(e) => throw e - } - } - } - fetch.setDaemon(true) - fetch.start() - } - } - - override def open(context: TaskContext, startTime: TimeStamp): Unit = { - import context.{appId, appName, parallelism, taskId} - - val topics = config.getConsumerTopics - val grouper = config.getGrouper - val consumerConfig = config.consumerConfig - val topicAndPartitions = grouper.group(parallelism, taskId.index, - KafkaUtil.getTopicAndPartitions(KafkaUtil.connectZookeeper(consumerConfig)(), topics)) - this.fetchThread = Some(FetchThread(topicAndPartitions, config.getFetchThreshold, - config.getFetchSleepMS, config.getConsumerStartOffset, consumerConfig)) - this.offsetManagers = topicAndPartitions.map { tp => - val storageTopic = s"app${appId}_${appName}_${tp.topic}_${tp.partition}" - val storage = offsetStorageFactory.getOffsetStorage(storageTopic) - tp -> new KafkaOffsetManager(storage) - }.toMap - - setStartTime(Option(startTime)) - } - - override def read(): Message = { - fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull - } - - private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = { - val msgOpt = offsetManagers(kafkaMsg.topicAndPartition) - .filter(messageDecoder.fromBytes(kafkaMsg.msg) -> kafkaMsg.offset) - msgOpt.flatMap { msg => - startTime match { - case None => - Some(msg) - case Some(time) => - timestampFilter.filter(msg, time) - } - } - } - - override def close(): Unit = { - offsetManagers.foreach(_._2.close()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala deleted file mode 100644 index e50bf84..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka - -import java.util.Properties -import scala.collection.mutable -import scala.util.{Failure, Success, Try} - -import com.twitter.bijection.Injection -import kafka.api.OffsetRequest -import kafka.consumer.ConsumerConfig -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.streaming.kafka.lib.KafkaUtil -import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer -import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} -import io.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory} -import io.gearpump.util.LogUtil - -/** - * Factory that builds [[KafkaStorage]] - * - * @param consumerProps kafka consumer config - * @param producerProps kafka producer config - */ -class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties) - extends OffsetStorageFactory { - - /** - * Creates consumer config properties with `zookeeper.connect` set to zkConnect - * and producer config properties with `bootstrap.servers` set to bootstrapServers - * - * @param zkConnect kafka consumer config `zookeeper.connect` - * @param bootstrapServers kafka producer config `bootstrap.servers` - */ - def this(zkConnect: String, bootstrapServers: String) = - this(KafkaUtil.buildConsumerConfig(zkConnect), KafkaUtil.buildProducerConfig(bootstrapServers)) - - override def getOffsetStorage(dir: String): OffsetStorage = { - val topic = dir - val consumerConfig = new ConsumerConfig(consumerProps) - val getConsumer = () => KafkaConsumer(topic, 0, OffsetRequest.EarliestTime, consumerConfig) - new KafkaStorage(topic, KafkaUtil.createKafkaProducer[Array[Byte], Array[Byte]]( - producerProps, new ByteArraySerializer, new ByteArraySerializer), - getConsumer(), KafkaUtil.connectZookeeper(consumerConfig)()) - } -} - -object KafkaStorage { - private val LOG: Logger = LogUtil.getLogger(classOf[KafkaStorage]) -} - -/** - * Stores offset-timestamp mapping to kafka - * - * @param topic kafka store topic - * @param producer kafka producer - * @param getConsumer function to get kafka consumer - * @param connectZk function to connect zookeeper - */ -class KafkaStorage private[kafka]( - topic: String, - producer: KafkaProducer[Array[Byte], Array[Byte]], - getConsumer: => KafkaConsumer, - connectZk: => ZkClient) - extends OffsetStorage { - - private lazy val consumer = getConsumer - - private val dataByTime: List[(TimeStamp, Array[Byte])] = { - if (KafkaUtil.topicExists(connectZk, topic)) { - load(consumer) - } else { - List.empty[(TimeStamp, Array[Byte])] - } - } - - /** - * Offsets with timestamp less than `time` have already been processed by the system - * so we look up the storage for the first offset with timestamp large equal than `time` - * on replay. - * - * @param time the timestamp to look up for the earliest unprocessed offset - * @return the earliest unprocessed offset if `time` is in the range, otherwise failure - */ - override def lookUp(time: TimeStamp): Try[Array[Byte]] = { - if (dataByTime.isEmpty) { - Failure(StorageEmpty) - } else { - val min = dataByTime.head - val max = dataByTime.last - if (time < min._1) { - Failure(Underflow(min._2)) - } else if (time > max._1) { - Failure(Overflow(max._2)) - } else { - Success(dataByTime.find(_._1 >= time).get._2) - } - } - } - - override def append(time: TimeStamp, offset: Array[Byte]): Unit = { - val message = new ProducerRecord[Array[Byte], Array[Byte]]( - topic, 0, Injection[Long, Array[Byte]](time), offset) - producer.send(message) - } - - override def close(): Unit = { - producer.close() - KafkaUtil.deleteTopic(connectZk, topic) - } - - private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, Array[Byte])] = { - var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, Array[Byte])] - while (consumer.hasNext) { - val kafkaMsg = consumer.next - kafkaMsg.key.map { k => - Injection.invert[TimeStamp, Array[Byte]](k) match { - case Success(time) => - messagesBuilder += (time -> kafkaMsg.msg) - case Failure(e) => throw e - } - } orElse (throw new RuntimeException("offset key should not be null")) - } - consumer.close() - messagesBuilder.result().toList - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala deleted file mode 100644 index 2a852e2..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.kafka.dsl - -import java.util.Properties - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.dsl -import io.gearpump.streaming.kafka.KafkaSink - -class KafkaDSLSink[T](stream: dsl.Stream[T]) { - - /** Create a Kafka DSL Sink */ - def writeToKafka( - topic: String, - bootstrapServers: String, - parallism: Int, - description: String): dsl.Stream[T] = { - stream.sink(new KafkaSink(topic, bootstrapServers), parallism, UserConfig.empty, description) - } - - def writeToKafka( - topic: String, - properties: Properties, - parallism: Int, - description: String): dsl.Stream[T] = { - stream.sink(new KafkaSink(topic, properties), parallism, UserConfig.empty, description) - } -} - -object KafkaDSLSink { - - import scala.language.implicitConversions - - implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = { - new KafkaDSLSink[T](stream) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala deleted file mode 100644 index 325b40f..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.kafka.dsl - -import java.util.Properties - -import io.gearpump.streaming.dsl -import io.gearpump.streaming.dsl.StreamApp -import io.gearpump.streaming.kafka.KafkaSource -import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig} -import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} - -object KafkaDSLUtil { - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - kafkaConfig: KafkaSourceConfig, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = { - app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder), - parallelism, description) - } - - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - topics: String, - zkConnect: String, - offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory), - parallelism, description) - } - - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - topics: String, - zkConnect: String, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory, - messageDecoder, timestampFilter), parallelism, description) - } - - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - topics: String, - properties: Properties, - offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, properties, offsetStorageFactory), - parallelism, description) - } - - def createStream[T]( - app: StreamApp, - topics: String, - parallelism: Int, - description: String, - properties: Properties, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, properties, offsetStorageFactory, - messageDecoder, timestampFilter), parallelism, description) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala deleted file mode 100644 index f846efe..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import scala.util.{Failure, Success} - -import com.twitter.bijection.Injection - -import io.gearpump.Message -import io.gearpump.streaming.transaction.api.MessageDecoder - -class DefaultMessageDecoder extends MessageDecoder { - override def fromBytes(bytes: Array[Byte]): Message = { - Message(bytes, System.currentTimeMillis()) - } -} - -class StringMessageDecoder extends MessageDecoder { - override def fromBytes(bytes: Array[Byte]): Message = { - Injection.invert[String, Array[Byte]](bytes) match { - case Success(s) => Message(s, System.currentTimeMillis()) - case Failure(e) => throw e - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala deleted file mode 100644 index e9c95e3..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import scala.util.{Failure, Success, Try} - -import com.twitter.bijection.Injection -import org.slf4j.Logger - -import io.gearpump._ -import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} -import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage} -import io.gearpump.util.LogUtil - -object KafkaOffsetManager { - private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager]) -} - -private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager { - import io.gearpump.streaming.kafka.lib.KafkaOffsetManager._ - - var maxTime: TimeStamp = 0L - - override def filter(messageAndOffset: (Message, Long)): Option[Message] = { - val (message, offset) = messageAndOffset - if (message.timestamp > maxTime) { - maxTime = message.timestamp - storage.append(maxTime, Injection[Long, Array[Byte]](offset)) - } - Some(message) - } - - override def resolveOffset(time: TimeStamp): Try[Long] = { - storage.lookUp(time) match { - case Success(offset) => Injection.invert[Long, Array[Byte]](offset) - case Failure(Overflow(max)) => - LOG.warn(s"start time larger than the max stored TimeStamp; set to max offset") - Injection.invert[Long, Array[Byte]](max) - case Failure(Underflow(min)) => - LOG.warn(s"start time less than the min stored TimeStamp; set to min offset") - Injection.invert[Long, Array[Byte]](min) - case Failure(StorageEmpty) => Failure(StorageEmpty) - case Failure(e) => throw e - } - } - - override def close(): Unit = { - storage.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala deleted file mode 100644 index 123f3ac..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import java.util.Properties - -import kafka.api.OffsetRequest -import kafka.consumer.ConsumerConfig -import org.slf4j.Logger - -import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper} -import io.gearpump.util.LogUtil - -object KafkaSourceConfig { - - val NAME = "kafka_config" - - val ZOOKEEPER_CONNECT = "zookeeper.connect" - val GROUP_ID = "group.id" - val CONSUMER_START_OFFSET = "kafka.consumer.start.offset" - val CONSUMER_TOPICS = "kafka.consumer.topics" - val FETCH_THRESHOLD = "kafka.consumer.fetch.threshold" - val FETCH_SLEEP_MS = "kafka.consumer.fetch.sleep.ms" - val GROUPER_CLASS = "kafka.grouper.class" - - private val LOG: Logger = LogUtil.getLogger(getClass) - - def apply(consumerProps: Properties): KafkaSourceConfig = new KafkaSourceConfig(consumerProps) -} - -/** - * Extends kafka.consumer.ConsumerConfig with specific config needed by - * [[io.gearpump.streaming.kafka.KafkaSource]] - * - * @param consumerProps kafka consumer config - */ -class KafkaSourceConfig(val consumerProps: Properties = new Properties) - extends java.io.Serializable { - import io.gearpump.streaming.kafka.lib.KafkaSourceConfig._ - - if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) { - consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181") - } - - if (!consumerProps.containsKey(GROUP_ID)) { - consumerProps.setProperty(GROUP_ID, "gearpump") - } - - def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps) - - /** - * Set kafka consumer topics, seperated by comma. - * - * @param topics comma-separated string - * @return new KafkaConfig based on this but with - * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]] - * set to given value - */ - def withConsumerTopics(topics: String): KafkaSourceConfig = { - consumerProps.setProperty(CONSUMER_TOPICS, topics) - KafkaSourceConfig(consumerProps) - } - - /** - * Returns a list of kafka consumer topics - */ - def getConsumerTopics: List[String] = { - Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList - } - - /** - * Sets the sleep interval if there are no more message or message buffer is full. - * - * Consumer.FetchThread will sleep for a while if no more messages or - * the incoming queue size is above the - * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] - * - * @param sleepMS sleep interval in milliseconds - * @return new KafkaConfig based on this but with - * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]] set to given value - */ - def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = { - consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS") - KafkaSourceConfig(consumerProps) - } - - /** - * Gets the sleep interval - * - * Consumer.FetchThread sleeps for a while if no more messages or - * the incoming queue is full (size is bigger than the - * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]) - * - * @return sleep interval in milliseconds - */ - def getFetchSleepMS: Int = { - Option(consumerProps.getProperty(FETCH_SLEEP_MS)).getOrElse("100").toInt - } - - /** - * Sets the batch size we use for one fetch. - * - * Consumer.FetchThread stops fetching new messages if its incoming queue - * size is above the threshold and starts again when the queue size is below it - * - * @param threshold queue size - * @return new KafkaConfig based on this but with - * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] set to give value - */ - def withFetchThreshold(threshold: Int): KafkaSourceConfig = { - consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold") - KafkaSourceConfig(consumerProps) - } - - /** - * Returns fetch batch size. - * - * Consumer.FetchThread stops fetching new messages if - * its incoming queue size is above the threshold and starts again when the queue size is below it - * - * @return fetch threshold - */ - def getFetchThreshold: Int = { - Option(consumerProps.getProperty(FETCH_THRESHOLD)).getOrElse("10000").toInt - } - - /** - * Sets [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which - * defines how kafka.common.TopicAndPartitions are mapped to source tasks. - * - * @param className name of the factory class - * @return new KafkaConfig based on this but with - * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]] set to given value - */ - def withGrouper(className: String): KafkaSourceConfig = { - consumerProps.setProperty(GROUPER_CLASS, className) - KafkaSourceConfig(consumerProps) - } - - /** - * Returns [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which - * defines how kafka.common.TopicAndPartitions are mapped to source tasks - */ - def getGrouper: KafkaGrouper = { - Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS)) - .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper] - } - - def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = { - consumerProps.setProperty(CONSUMER_START_OFFSET, s"$earliestOrLatest") - KafkaSourceConfig(consumerProps) - } - - def getConsumerStartOffset: Long = { - Option(consumerProps.getProperty(CONSUMER_START_OFFSET)) - .getOrElse(s"${OffsetRequest.EarliestTime}").toLong - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala deleted file mode 100644 index 2f7fcf7..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import java.io.InputStream -import java.util.Properties - -import kafka.admin.AdminUtils -import kafka.cluster.Broker -import kafka.common.TopicAndPartition -import kafka.consumer.ConsumerConfig -import kafka.utils.{ZKStringSerializer, ZkUtils} -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} -import org.apache.kafka.common.serialization.Serializer -import org.slf4j.Logger - -import io.gearpump.util.LogUtil - -object KafkaUtil { - private val LOG: Logger = LogUtil.getLogger(getClass) - - def getBroker(connectZk: => ZkClient, topic: String, partition: Int): Broker = { - val zkClient = connectZk - try { - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) - .getOrElse(throw new RuntimeException( - s"leader not available for TopicAndPartition($topic, $partition)")) - ZkUtils.getBrokerInfo(zkClient, leader) - .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader")) - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String]) - : Array[TopicAndPartition] = { - val zkClient = connectZk - try { - ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap { - case (topic, partitions) => partitions.map(TopicAndPartition(topic, _)) - }.toArray - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - def topicExists(connectZk: => ZkClient, topic: String): Boolean = { - val zkClient = connectZk - try { - AdminUtils.topicExists(zkClient, topic) - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - /** - * create a new kafka topic - * return true if topic already exists, and false otherwise - */ - def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int) - : Boolean = { - val zkClient = connectZk - try { - if (AdminUtils.topicExists(zkClient, topic)) { - LOG.info(s"topic $topic exists") - true - } else { - AdminUtils.createTopic(zkClient, topic, partitions, replicas) - LOG.info(s"created topic $topic") - false - } - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - def deleteTopic(connectZk: => ZkClient, topic: String): Unit = { - val zkClient = connectZk - try { - AdminUtils.deleteTopic(zkClient, topic) - } catch { - case e: Exception => - LOG.error(e.getMessage) - } finally { - zkClient.close() - } - } - - def connectZookeeper(config: ConsumerConfig): () => ZkClient = { - val zookeeperConnect = config.zkConnect - val sessionTimeout = config.zkSessionTimeoutMs - val connectionTimeout = config.zkConnectionTimeoutMs - () => new ZkClient(zookeeperConnect, sessionTimeout, connectionTimeout, ZKStringSerializer) - } - - def loadProperties(filename: String): Properties = { - val props = new Properties() - var propStream: InputStream = null - try { - propStream = getClass.getClassLoader.getResourceAsStream(filename) - props.load(propStream) - } catch { - case e: Exception => - LOG.error(s"$filename not found") - } finally { - if (propStream != null) { - propStream.close() - } - } - props - } - - def createKafkaProducer[K, V](properties: Properties, - keySerializer: Serializer[K], - valueSerializer: Serializer[V]): KafkaProducer[K, V] = { - if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { - properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - } - new KafkaProducer[K, V](properties, keySerializer, valueSerializer) - } - - def buildProducerConfig(bootstrapServers: String): Properties = { - val properties = new Properties() - properties.setProperty("bootstrap.servers", bootstrapServers) - properties - } - - def buildConsumerConfig(zkConnect: String): Properties = { - val properties = new Properties() - properties.setProperty("zookeeper.connect", zkConnect) - properties.setProperty("group.id", "gearpump") - properties - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala deleted file mode 100644 index 141ae98..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.consumer - -/** - * someone sleeps for exponentially increasing duration each time - * until the cap - * - * @param backOffMultiplier The factor by which the duration increases. - * @param initialDurationMs Time in milliseconds for initial sleep. - * @param maximumDurationMs Cap up to which we will increase the duration. - */ -private[consumer] class ExponentialBackoffSleeper( - backOffMultiplier: Double = 2.0, - initialDurationMs: Long = 100, - maximumDurationMs: Long = 10000) { - - require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1") - require(initialDurationMs > 0, "initialDurationMs must be positive") - require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs") - - private var sleepDuration = initialDurationMs - - def reset(): Unit = { - sleepDuration = initialDurationMs - } - - def sleep(): Unit = { - Thread.sleep(sleepDuration) - setNextSleepDuration() - } - - def getSleepDuration: Long = sleepDuration - - def setNextSleepDuration(): Unit = { - val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long] - sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala deleted file mode 100644 index 8dbe145..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.consumer - -import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.LinkedBlockingQueue - -import kafka.common.TopicAndPartition -import kafka.consumer.ConsumerConfig -import org.slf4j.Logger - -import io.gearpump.util.LogUtil - -object FetchThread { - private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread]) - - def apply(topicAndPartitions: Array[TopicAndPartition], - fetchThreshold: Int, - fetchSleepMS: Long, - startOffsetTime: Long, - consumerConfig: ConsumerConfig): FetchThread = { - val createConsumer = (tp: TopicAndPartition) => - KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig) - - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS) - } -} - -/** - * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them - * onto a queue, which is asynchronously polled by a consumer - * - * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a - * [[io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to - * connect to it - * @param incomingQueue a queue to buffer incoming messages - * @param fetchThreshold above which thread should stop fetching messages - * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold - */ -private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition], - createConsumer: TopicAndPartition => KafkaConsumer, - incomingQueue: LinkedBlockingQueue[KafkaMessage], - fetchThreshold: Int, - fetchSleepMS: Long) extends Thread { - import io.gearpump.streaming.kafka.lib.consumer.FetchThread._ - - private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers - - def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = { - consumers(tp).setStartOffset(startOffset) - } - - def poll: Option[KafkaMessage] = { - Option(incomingQueue.poll()) - } - - override def run(): Unit = { - try { - var nextOffsets = Map.empty[TopicAndPartition, Long] - var reset = false - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 100L, - maximumDurationMs = 10000L) - while (!Thread.currentThread().isInterrupted) { - try { - if (reset) { - nextOffsets = consumers.mapValues(_.getNextOffset) - resetConsumers(nextOffsets) - reset = false - } - val hasMoreMessages = fetchMessage - sleeper.reset() - if (!hasMoreMessages) { - Thread.sleep(fetchSleepMS) - } - } catch { - case exception: Exception => - LOG.warn(s"resetting consumers due to $exception") - reset = true - sleeper.sleep() - } - } - } catch { - case e: InterruptedException => LOG.info("fetch thread got interrupted exception") - case e: ClosedByInterruptException => LOG.info("fetch thread closed by interrupt exception") - } finally { - consumers.values.foreach(_.close()) - } - } - - /** - * fetch message from each TopicAndPartition in a round-robin way - */ - def fetchMessage: Boolean = { - consumers.foldLeft(false) { (hasNext, tpAndConsumer) => - val (_, consumer) = tpAndConsumer - if (incomingQueue.size < fetchThreshold) { - if (consumer.hasNext) { - incomingQueue.put(consumer.next()) - true - } else { - hasNext - } - } else { - true - } - } - } - - private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = { - topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap - } - - private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = { - consumers.values.foreach(_.close()) - consumers = createAllConsumers - consumers.foreach { case (tp, consumer) => - consumer.setStartOffset(nextOffsets(tp)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala deleted file mode 100644 index 77321b9..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.consumer - -import kafka.api.{FetchRequestBuilder, OffsetRequest} -import kafka.common.ErrorMapping._ -import kafka.common.TopicAndPartition -import kafka.consumer.{ConsumerConfig, SimpleConsumer} -import kafka.message.MessageAndOffset -import kafka.utils.Utils - -import io.gearpump.streaming.kafka.lib.KafkaUtil - -object KafkaConsumer { - def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig) - : KafkaConsumer = { - val connectZk = KafkaUtil.connectZookeeper(config) - val broker = KafkaUtil.getBroker(connectZk(), topic, partition) - val soTimeout = config.socketTimeoutMs - val soBufferSize = config.socketReceiveBufferBytes - val fetchSize = config.fetchMessageMaxBytes - val clientId = config.clientId - val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, soBufferSize, clientId) - val getIterator = (offset: Long) => { - val request = new FetchRequestBuilder() - .addFetch(topic, partition, offset, fetchSize) - .build() - - val response = consumer.fetch(request) - response.errorCode(topic, partition) match { - case NoError => response.messageSet(topic, partition).iterator - case error => throw exceptionFor(error) - } - } - new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime) - } -} - -/** - * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over - * messages from a kafka kafka.common.TopicAndPartition. - */ -class KafkaConsumer(consumer: SimpleConsumer, - topic: String, - partition: Int, - getIterator: (Long) => Iterator[MessageAndOffset], - startOffsetTime: Long = OffsetRequest.EarliestTime) { - private val earliestOffset = consumer - .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1) - private var nextOffset: Long = earliestOffset - private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset) - - def setStartOffset(startOffset: Long): Unit = { - nextOffset = startOffset - iterator = getIterator(nextOffset) - } - - def next(): KafkaMessage = { - val mo = iterator.next() - val message = mo.message - - nextOffset = mo.nextOffset - - val offset = mo.offset - val payload = Utils.readBytes(message.payload) - new KafkaMessage(topic, partition, offset, Option(message.key).map(Utils.readBytes), payload) - } - - def hasNext: Boolean = { - @annotation.tailrec - def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): Boolean = { - if (iter.hasNext) true - else if (newIterator) false - else { - iterator = getIterator(nextOffset) - hasNextHelper(iterator, newIterator = true) - } - } - hasNextHelper(iterator, newIterator = false) - } - - def getNextOffset: Long = nextOffset - - def close(): Unit = { - consumer.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala deleted file mode 100644 index 16330ed..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.consumer - -import kafka.common.TopicAndPartition - -/** - * wrapper over messages from kafka - * @param topicAndPartition where message comes from - * @param offset message offset on kafka queue - * @param key message key, could be None - * @param msg message payload - */ -case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long, - key: Option[Array[Byte]], msg: Array[Byte]) { - - def this(topic: String, partition: Int, offset: Long, - key: Option[Array[Byte]], msg: Array[Byte]) = { - this(TopicAndPartition(topic, partition), offset, key, msg) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala deleted file mode 100644 index 0f968e2..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.grouper - -import kafka.common.TopicAndPartition - -/** - * default grouper groups TopicAndPartitions among StreamProducers by partitions - * - * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) and - * 2 streamProducers (streamProducer0 and streamProducer1) - * - * streamProducer0 gets (topicA, partition1), (topicB, partition1) and (topicA, partition3) - * streamProducer1 gets (topicA, partition2), (topicB, partition2) - */ -class KafkaDefaultGrouper extends KafkaGrouper { - def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) - : Array[TopicAndPartition] = { - topicAndPartitions.indices.filter(_ % taskNum == taskIndex) - .map(i => topicAndPartitions(i)).toArray - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala deleted file mode 100644 index 6660a04..0000000 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.grouper - -import kafka.common.TopicAndPartition - -/** - * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks - */ -trait KafkaGrouper { - def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) - : Array[TopicAndPartition] -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala new file mode 100644 index 0000000..cb90f93 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka + +import java.util.Properties + +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.kafka.lib.KafkaUtil +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext + +/** + * kafka sink connectors that invokes org.apache.kafka.clients.producer.KafkaProducer to send + * messages to kafka queue + * @param getProducer is a function to construct a KafkaProducer + * @param topic is the kafka topic to write to + */ +class KafkaSink private[kafka]( + getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) extends DataSink { + + /** + * @param topic producer topic + * @param properties producer config + */ + def this(topic: String, properties: Properties) = { + this(() => KafkaUtil.createKafkaProducer(properties, + new ByteArraySerializer, new ByteArraySerializer), topic) + } + + /** + * + * creates an empty properties with `bootstrap.servers` set to `bootstrapServers` + * and invokes `KafkaSink(topic, properties)` + * @param topic producer topic + * @param bootstrapServers kafka producer config `bootstrap.servers` + */ + def this(topic: String, bootstrapServers: String) = { + this(topic, KafkaUtil.buildProducerConfig(bootstrapServers)) + } + + // Lazily construct producer since KafkaProducer is not serializable + private lazy val producer = getProducer() + + override def open(context: TaskContext): Unit = {} + + override def write(message: Message): Unit = { + val record = message.msg match { + case (k, v) => + new ProducerRecord[Array[Byte], Array[Byte]](topic, k.asInstanceOf[Array[Byte]], + v.asInstanceOf[Array[Byte]]) + case v => + new ProducerRecord[Array[Byte], Array[Byte]](topic, v.asInstanceOf[Array[Byte]]) + } + producer.send(record) + } + + override def close(): Unit = { + producer.close() + } +} +
