sanathkumar prasanna created FLUME-3071:
-------------------------------------------

             Summary: Multiplex Channel Selector is not working properly at 
flume 1.7 version
                 Key: FLUME-3071
                 URL: https://issues.apache.org/jira/browse/FLUME-3071
             Project: Flume
          Issue Type: Request
          Components: Configuration
    Affects Versions: v1.7.0
         Environment: Environment Details
Hadoop 2.5.2
Flume 1.7
Hive 0.14
            Reporter: sanathkumar prasanna
             Fix For: v1.7.0


Environment Details
I am Using multiplex channel selector to load the data to two different Hive 
sinks.But it get strucked ai source si started after that didn't get any 
error.Data also not get loaded here.


#Source
test_interceptors.sources = RTI
test_interceptors.channels = RTI_Channel1 RTI_Channel2
test_interceptors.sinks = RTI_to_hive1 RTI_to_hive1

test_interceptors.sources.RTI.channels = RTI_Channel1 RTI_Channel2


#test_interceptors.sinks.RTI_to_hive3.channel = RTI_Channel3



test_interceptors.sources.RTI.type = TAILDIR
test_interceptors.sources.RTI.positionFile=/home/retailteg/flume/taildir_position.json
test_interceptors.sources.RTI.filegroups=f1
test_interceptors.sources.RTI.filegroups.f1=/home/retailteg/flume/flumeSpool/initial_insert
test_interceptors.sources.RTI.filegroups.f1.headerkey1=value1
test_interceptors.sources.RTI.fileHeader = false
test_interceptors.sources.RTI.skipToEnd = false


test_interceptors.sources.RTI.selector.type = multiplexing
test_interceptors.sources.RTI.selector.header=table_name
test_interceptors.sources.RTI.selector.mapping.Table1=RTI_Channel1
test_interceptors.sources.RTI.selector.mapping.Table2= RTI_Channel2
test_interceptors.sources.RTI.selector.default= RTI_Channel2

#Channel
test_interceptors.channels.RTI_Channel1.type = memory
test_interceptors.channels.RTI_Channel1.capacity = 100000
test_interceptors.channels.RTI_Channel1.transactionCapacity = 100000
test_interceptors.channels.RTI_Channel2.type = memory
test_interceptors.channels.RTI_Channel2.capacity = 100000
test_interceptors.channels.RTI_Channel2.transactionCapacity = 100000



#Sinks
test_interceptors.sinks.RTI_to_hive1.type = hive
test_interceptors.sinks.RTI_to_hive1.hive.metastore = 
thrift://172.20.180.64:9083
test_interceptors.sinks.RTI_to_hive1.hive.database = default
test_interceptors.sinks.RTI_to_hive1.hive.table = RTI_Test_Feb2017_8
test_interceptors.sinks.RTI_to_hive1.hive.partition = %Y-%m-%d
test_interceptors.sinks.RTI_to_hive1.useLocalTimeStamp = true
test_interceptors.sinks.RTI_to_hive1.round = true
test_interceptors.sinks.RTI_to_hive1.roundValue = 10
test_interceptors.sinks.RTI_to_hive1.roundUnit = minute 
test_interceptors.sinks.RTI_to_hive1.serializer = DELIMITED
test_interceptors.sinks.RTI_to_hive1.serializer.delimiter = ","
test_interceptors.sinks.RTI_to_hive1.serializer.serdeSeparator = ','
test_interceptors.sinks.RTI_to_hive1.serializer.fieldnames = 
table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag




test_interceptors.sinks.RTI_to_hive2.type = hive
test_interceptors.sinks.RTI_to_hive2.hive.metastore = 
thrift://172.20.180.64:9083
test_interceptors.sinks.RTI_to_hive2.hive.database = default
test_interceptors.sinks.RTI_to_hive2.hive.table = RTI_Test_Feb2017_9
test_interceptors.sinks.RTI_to_hive2.hive.partition = %Y-%m-%d
test_interceptors.sinks.RTI_to_hive2.useLocalTimeStamp = true
test_interceptors.sinks.RTI_to_hive2.round = true
test_interceptors.sinks.RTI_to_hive2.roundValue = 10
test_interceptors.sinks.RTI_to_hive2.roundUnit = minute 
test_interceptors.sinks.RTI_to_hive2.serializer = DELIMITED
test_interceptors.sinks.RTI_to_hive2.serializer.delimiter = ","
test_interceptors.sinks.RTI_to_hive2.serializer.serdeSeparator = ','
test_interceptors.sinks.RTI_to_hive2.serializer.fieldnames = 
table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag




test_interceptors.sinks.RTI_to_hive1.channel = RTI_Channel1
test_interceptors.sinks.RTI_to_hive2.channel = RTI_Channel2



In Log Files Contenet


2017-03-10 19:44:18,770 (main) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:144)] 
Starting Channel RTI_Channel1
2017-03-10 19:44:18,773 (main) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:144)] 
Starting Channel RTI_Channel2
2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
 Monitored counter group for type: CHANNEL, name: RTI_Channel1: Successfully 
registered new MBean.
2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
 Component type: CHANNEL, name: RTI_Channel1 started
2017-03-10 19:44:18,776 (main) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:159)] 
Waiting for channel: RTI_Channel2 to start. Sleeping for 500 ms
2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
 Monitored counter group for type: CHANNEL, name: RTI_Channel2: Successfully 
registered new MBean.
2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
 Component type: CHANNEL, name: RTI_Channel2 started
2017-03-10 19:44:19,276 (main) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:171)] 
Starting Sink RTI_to_hive1
2017-03-10 19:44:19,277 (main) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:182)] 
Starting Source RTI
2017-03-10 19:44:19,277 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:92)] RTI 
TaildirSource source starting with directory: 
{f1=/home/retailteg/flume/flumeSpool/initial_insert}
2017-03-10 19:44:19,281 (lifecycleSupervisor-1-5) [DEBUG - 
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:75)]
 Initializing ReliableTaildirEventReader with 
directory={f1=/home/retailteg/flume/flumeSpool/initial_insert}, metaDir={}
2017-03-10 19:44:19,283 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:83)]
 taildirCache: [{filegroup='f1', 
filePattern='/home/retailteg/flume/flumeSpool/initial_insert', cached=true}]
2017-03-10 19:44:19,285 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:84)]
 headerTable: {}
2017-03-10 19:44:19,289 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)]
 Opening file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, 
pos: 0
2017-03-10 19:44:19,290 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:94)]
 Updating position from position file: 
/home/retailteg/flume/taildir_position.json
2017-03-10 19:44:19,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG 
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:141)] Polling 
sink runner starting
2017-03-10 19:44:19,362 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.source.taildir.TailFile.updatePos(TailFile.java:126)] Updated 
position, file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 
9970903, pos: 884
2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [DEBUG - 
org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:118)] 
TaildirSource started
2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
 Monitored counter group for type: SOURCE, name: RTI: Successfully registered 
new MBean.
2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
 Component type: SOURCE, name: RTI started
2017-03-10 19:44:19,366 (PollableSourceRunner-TaildirSource-RTI) [DEBUG - 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:127)]
 Polling runner starting. Source:Taildir source: { positionFile: 
/home/retailteg/flume/taildir_position.json, skipToEnd: false, 
byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 }
2017-03-10 19:46:19,459 (PollableSourceRunner-TaildirSource-RTI) [INFO - 
org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)]
 Closed file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, 
pos: 884




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to