[ https://issues.apache.org/jira/browse/FLUME-2956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290370#comment-16290370 ]
He Jiang edited comment on FLUME-2956 at 12/14/17 5:51 AM: ----------------------------------------------------------- Sure. Basically the original code (HiveSink.java, line 314) only send heartbeat on activeWriters, which means a collection of writers that just consume some event. My modification give the inactive writers a chance to send heartbeat if it's needed. Here's the original code piece (HiveSink.java, line 314): // 5) Flush all Writers for (HiveWriter writer : activeWriters.values()) { writer.flush(true); } And Here's the modified version: // 5) Flush all Writers for (HiveWriter writer : allWriters.values()) { if (activeWriters.values().contains(writer)) { writer.flush(true); } else { writer.sendHeartbeatIfNeeded(); } } Please see attachment for the modified jar or get it here: [^flume-hive-sink-1.8.0.jar] was (Author: hejiang): Sure. Basically the original code (HiveSink.java, line 314) only send heartbeat on activeWriters, which means a collection of writers that just consume some event. My modification give the inactive writers a chance to send heartbeat if it's needed. Here's the original code piece (HiveSink.java, line 314): // 5) Flush all Writers for (HiveWriter writer : activeWriters.values()) { writer.flush(true); } And Here's the modified version: // 5) Flush all Writers for (HiveWriter writer : allWriters.values()) { if (activeWriters.values().contains(writer)) { writer.flush(true); } else { writer.sendHeartbeatIfNeeded(); } } Please see attachment for the modified jar. > hive sink not sending heartbeat correctly > ----------------------------------------- > > Key: FLUME-2956 > URL: https://issues.apache.org/jira/browse/FLUME-2956 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: 1.5.2 > Environment: linux CentOS 6.6 > Reporter: Olivier brobecker > Attachments: flume-hive-sink-1.8.0.jar > > > Flume has been configured in order to populate a hive table as follow : > agentSCDR.sinks.hive1.type = hive > agentSCDR.sinks.hive1.channel = channel1 > agentSCDR.sinks.hive1.hive.metastore = thrift://myserver:9083 > agentSCDR.sinks.hive1.hive.txnsPerBatchAsk = 10 > agentSCDR.sinks.hive1.hive.database = myDatabase > agentSCDR.sinks.hive1.hive.table = my_table > agentSCDR.sinks.hive1.hive.partition = %Y > agentSCDR.sinks.hive1.heartBeatInterval = 60 > agentSCDR.sinks.hive1.serializer = DELIMITED > agentSCDR.sinks.hive1.serializer.delimiter = ; > agentSCDR.sinks.hive1.serializer.serdeSeparator = ; > agentSCDR.sinks.hive1.serializer.fieldnames = field1,field2,field3... > My dataflow is irregular at best and I can have more than 30 minutes of > inactivity, so I have a heartbeatInterval at 60s in order to keep my txn > alive. > The issue is that this heartBeat is only sent when flume is trying to inject > some datas into hive instead of having one every 60s. > # grep -i heartbeat flume-agentSCDR.log > 15 juil. 2016 13:40:43,008 INFO [hive-hive1-call-runner-0] > (org.apache.flume.sink.hive.HiveWriter$2.call:238) - Sending heartbeat on > batch TxnIds=[3755...3764] on endPoint = {metaStoreUri=... > 15 juil. 2016 14:12:21,001 INFO [hive-hive1-call-runner-0] > (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on > batch TxnIds=[3785...3794] on endPoint = {metaStoreUri=... > 15 juil. 2016 14:27:56,963 INFO [hive-hive1-call-runner-0] > (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on > batch TxnIds=[3795...3804] on endPoint = {metaStoreUri=... > ... -- This message was sent by Atlassian JIRA (v6.4.14#64029)