你好, 很抱歉错过了这封邮件。由于这是个已知问题为了减少对用户的影响,故抄送到dev mailing list。
这是RocketMQ-Spark去年已经修复的一个BUG(详情: https://github.com/apache/rocketmq-externals/pull/36) 可以使用最新版本解决此问题。 BTW,RocketMQ-Spark近期将会进行一次优化改进,为RocketMQ用户提供一个更稳定、可靠的大数据集成组件。 谢谢。 在 2017年10月14日 下午7:53,Yingjun Li 李颖俊 <[email protected]>写道: > 你好: > 最近我在使用apache/rocketmq-externals 这个项目里面有关spark-streaming > 的插件,然后通过在github上面了解到consumer push mode是你实现合并上去的。方便打扰一下向你请教点问题吗? > 我运行的环境如附件。 > > > JavaInputDStream<Message> stream = RocketMqUtils. > createJavaReliableMQPushStream( > jscc,pushConsumerProperties,StorageLevel.MEMORY_AND_DISK_ > SER()); > > 我采用的是reliable的push mode,数据可以从rocketMQ出来存进去队列BlockingQueue<MessageSet> > queue; > 但是到了这一步 > > try { > // According to the official docs > // 'To implement a reliable receiver, you have to use > store(multiple-records) to store data' > ReliableRocketMQReceiver.this.store(messageSet); > ack(messageSet.getId()); > } catch (Exception e) { > fail(messageSet.getId()); > } > MessageSender的进程就一直卡住了,过一段时间就出现OOM错误,如下: > > java.lang.OutOfMemoryError: Java heap space > at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest. > scala:177) > at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest. > scala:176) > at org.apache.spark.util.collection.PrimitiveVector. > copyArrayWithLength(PrimitiveVector.scala:87) > at org.apache.spark.util.collection.PrimitiveVector. > resize(PrimitiveVector.scala:74) > at org.apache.spark.util.collection.SizeTrackingVector. > resize(SizeTrackingVector.scala:35) > at org.apache.spark.util.collection.PrimitiveVector.$ > plus$eq(PrimitiveVector.scala:41) > at org.apache.spark.util.collection.SizeTrackingVector. > $plus$eq(SizeTrackingVector.scala:30) > at org.apache.spark.storage.memory.MemoryStore. > putIteratorAsValues(MemoryStore.scala:216) > at org.apache.spark.storage.BlockManager$$anonfun$ > doPutIterator$1.apply(BlockManager.scala:1038) > at org.apache.spark.storage.BlockManager$$anonfun$ > doPutIterator$1.apply(BlockManager.scala:1029) > at org.apache.spark.storage.BlockManager.doPut( > BlockManager.scala:969) > at org.apache.spark.storage.BlockManager.doPutIterator( > BlockManager.scala:1029) > at org.apache.spark.storage.BlockManager.putIterator( > BlockManager.scala:792) > at org.apache.spark.streaming.receiver. > BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:84) > at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl. > pushAndReportBlock(ReceiverSupervisorImpl.scala:158) > at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl. > pushIterator(ReceiverSupervisorImpl.scala:138) > at org.apache.spark.streaming.receiver.Receiver.store( > Receiver.scala:152) > at org.apache.rocketmq.spark.streaming.ReliableRocketMQReceiver$ > MessageSender.run(ReliableRocketMQReceiver.java:119) > > 通过dump发现List<Object>实例过多的问题,但是我本身才往rocketMQ插入500条记录。 > num #instances #bytes class name > ---------------------------------------------- > 1: 5005 537537816 [Ljava.lang.Object; > 2: 1664 135648072 [B > 3: 30846 2588360 [C > > 后面我就暂时无解了,只是猜测会不会跟MessageSet 里面的private List<MessageExt> data有关。 > 看到邮件如果有时间方便的话非常希望能得到你的帮助,谢谢! > > 李颖俊 > > > 2017.10.14 > > > -- Thanks, Xin
