[ 
https://issues.apache.org/jira/browse/FLUME-2956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290370#comment-16290370
 ] 

He Jiang edited comment on FLUME-2956 at 12/14/17 5:51 AM:
-----------------------------------------------------------

Sure. Basically the original code (HiveSink.java, line 314) only send heartbeat 
on activeWriters, which means a collection of writers that just consume some 
event. My modification give the inactive writers a chance to send heartbeat if 
it's needed.

Here's the original code piece (HiveSink.java, line 314):
      // 5) Flush all Writers
      for (HiveWriter writer : activeWriters.values()) {
          writer.flush(true);
      }

And Here's the modified version:
      // 5) Flush all Writers
      for (HiveWriter writer : allWriters.values()) {
        if (activeWriters.values().contains(writer)) {
          writer.flush(true);
        } else {
          writer.sendHeartbeatIfNeeded();
        }
      }

Please see attachment for the modified jar or get it here: 
[^flume-hive-sink-1.8.0.jar]


was (Author: hejiang):
Sure. Basically the original code (HiveSink.java, line 314) only send heartbeat 
on activeWriters, which means a collection of writers that just consume some 
event. My modification give the inactive writers a chance to send heartbeat if 
it's needed.

Here's the original code piece (HiveSink.java, line 314):
      // 5) Flush all Writers
      for (HiveWriter writer : activeWriters.values()) {
          writer.flush(true);
      }

And Here's the modified version:
      // 5) Flush all Writers
      for (HiveWriter writer : allWriters.values()) {
        if (activeWriters.values().contains(writer)) {
          writer.flush(true);
        } else {
          writer.sendHeartbeatIfNeeded();
        }
      }

Please see attachment for the modified jar.

> hive sink not sending heartbeat correctly
> -----------------------------------------
>
>                 Key: FLUME-2956
>                 URL: https://issues.apache.org/jira/browse/FLUME-2956
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.5.2
>         Environment: linux CentOS 6.6
>            Reporter: Olivier brobecker
>         Attachments: flume-hive-sink-1.8.0.jar
>
>
> Flume has been configured in order to populate a hive table as follow :
> agentSCDR.sinks.hive1.type = hive
> agentSCDR.sinks.hive1.channel = channel1
> agentSCDR.sinks.hive1.hive.metastore = thrift://myserver:9083
> agentSCDR.sinks.hive1.hive.txnsPerBatchAsk = 10
> agentSCDR.sinks.hive1.hive.database = myDatabase
> agentSCDR.sinks.hive1.hive.table = my_table
> agentSCDR.sinks.hive1.hive.partition = %Y
> agentSCDR.sinks.hive1.heartBeatInterval = 60
> agentSCDR.sinks.hive1.serializer = DELIMITED
> agentSCDR.sinks.hive1.serializer.delimiter = ;
> agentSCDR.sinks.hive1.serializer.serdeSeparator = ;
> agentSCDR.sinks.hive1.serializer.fieldnames = field1,field2,field3...
> My dataflow is irregular at best and I can have more than 30 minutes of 
> inactivity, so I have a heartbeatInterval at 60s in order to keep my txn 
> alive.
> The issue is that this heartBeat is only sent when flume is trying to inject 
> some datas into hive instead of having one every 60s.
> # grep -i heartbeat flume-agentSCDR.log
> 15 juil. 2016 13:40:43,008 INFO  [hive-hive1-call-runner-0] 
> (org.apache.flume.sink.hive.HiveWriter$2.call:238)  - Sending heartbeat on 
> batch TxnIds=[3755...3764] on endPoint = {metaStoreUri=...
> 15 juil. 2016 14:12:21,001 INFO  [hive-hive1-call-runner-0] 
> (org.apache.flume.sink.hive.HiveWriter$2.call:231)  - Sending heartbeat on 
> batch TxnIds=[3785...3794] on endPoint = {metaStoreUri=...
> 15 juil. 2016 14:27:56,963 INFO  [hive-hive1-call-runner-0] 
> (org.apache.flume.sink.hive.HiveWriter$2.call:231)  - Sending heartbeat on 
> batch TxnIds=[3795...3804] on endPoint = {metaStoreUri=...
> ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to