sanathkumar prasanna created FLUME-3071: -------------------------------------------
Summary: Multiplex Channel Selector is not working properly at flume 1.7 version Key: FLUME-3071 URL: https://issues.apache.org/jira/browse/FLUME-3071 Project: Flume Issue Type: Request Components: Configuration Affects Versions: v1.7.0 Environment: Environment Details Hadoop 2.5.2 Flume 1.7 Hive 0.14 Reporter: sanathkumar prasanna Fix For: v1.7.0 Environment Details I am Using multiplex channel selector to load the data to two different Hive sinks.But it get strucked ai source si started after that didn't get any error.Data also not get loaded here. #Source test_interceptors.sources = RTI test_interceptors.channels = RTI_Channel1 RTI_Channel2 test_interceptors.sinks = RTI_to_hive1 RTI_to_hive1 test_interceptors.sources.RTI.channels = RTI_Channel1 RTI_Channel2 #test_interceptors.sinks.RTI_to_hive3.channel = RTI_Channel3 test_interceptors.sources.RTI.type = TAILDIR test_interceptors.sources.RTI.positionFile=/home/retailteg/flume/taildir_position.json test_interceptors.sources.RTI.filegroups=f1 test_interceptors.sources.RTI.filegroups.f1=/home/retailteg/flume/flumeSpool/initial_insert test_interceptors.sources.RTI.filegroups.f1.headerkey1=value1 test_interceptors.sources.RTI.fileHeader = false test_interceptors.sources.RTI.skipToEnd = false test_interceptors.sources.RTI.selector.type = multiplexing test_interceptors.sources.RTI.selector.header=table_name test_interceptors.sources.RTI.selector.mapping.Table1=RTI_Channel1 test_interceptors.sources.RTI.selector.mapping.Table2= RTI_Channel2 test_interceptors.sources.RTI.selector.default= RTI_Channel2 #Channel test_interceptors.channels.RTI_Channel1.type = memory test_interceptors.channels.RTI_Channel1.capacity = 100000 test_interceptors.channels.RTI_Channel1.transactionCapacity = 100000 test_interceptors.channels.RTI_Channel2.type = memory test_interceptors.channels.RTI_Channel2.capacity = 100000 test_interceptors.channels.RTI_Channel2.transactionCapacity = 100000 #Sinks test_interceptors.sinks.RTI_to_hive1.type = hive test_interceptors.sinks.RTI_to_hive1.hive.metastore = thrift://172.20.180.64:9083 test_interceptors.sinks.RTI_to_hive1.hive.database = default test_interceptors.sinks.RTI_to_hive1.hive.table = RTI_Test_Feb2017_8 test_interceptors.sinks.RTI_to_hive1.hive.partition = %Y-%m-%d test_interceptors.sinks.RTI_to_hive1.useLocalTimeStamp = true test_interceptors.sinks.RTI_to_hive1.round = true test_interceptors.sinks.RTI_to_hive1.roundValue = 10 test_interceptors.sinks.RTI_to_hive1.roundUnit = minute test_interceptors.sinks.RTI_to_hive1.serializer = DELIMITED test_interceptors.sinks.RTI_to_hive1.serializer.delimiter = "," test_interceptors.sinks.RTI_to_hive1.serializer.serdeSeparator = ',' test_interceptors.sinks.RTI_to_hive1.serializer.fieldnames = table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag test_interceptors.sinks.RTI_to_hive2.type = hive test_interceptors.sinks.RTI_to_hive2.hive.metastore = thrift://172.20.180.64:9083 test_interceptors.sinks.RTI_to_hive2.hive.database = default test_interceptors.sinks.RTI_to_hive2.hive.table = RTI_Test_Feb2017_9 test_interceptors.sinks.RTI_to_hive2.hive.partition = %Y-%m-%d test_interceptors.sinks.RTI_to_hive2.useLocalTimeStamp = true test_interceptors.sinks.RTI_to_hive2.round = true test_interceptors.sinks.RTI_to_hive2.roundValue = 10 test_interceptors.sinks.RTI_to_hive2.roundUnit = minute test_interceptors.sinks.RTI_to_hive2.serializer = DELIMITED test_interceptors.sinks.RTI_to_hive2.serializer.delimiter = "," test_interceptors.sinks.RTI_to_hive2.serializer.serdeSeparator = ',' test_interceptors.sinks.RTI_to_hive2.serializer.fieldnames = table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag test_interceptors.sinks.RTI_to_hive1.channel = RTI_Channel1 test_interceptors.sinks.RTI_to_hive2.channel = RTI_Channel2 In Log Files Contenet 2017-03-10 19:44:18,770 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel RTI_Channel1 2017-03-10 19:44:18,773 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel RTI_Channel2 2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: RTI_Channel1: Successfully registered new MBean. 2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: RTI_Channel1 started 2017-03-10 19:44:18,776 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:159)] Waiting for channel: RTI_Channel2 to start. Sleeping for 500 ms 2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: RTI_Channel2: Successfully registered new MBean. 2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: RTI_Channel2 started 2017-03-10 19:44:19,276 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink RTI_to_hive1 2017-03-10 19:44:19,277 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source RTI 2017-03-10 19:44:19,277 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:92)] RTI TaildirSource source starting with directory: {f1=/home/retailteg/flume/flumeSpool/initial_insert} 2017-03-10 19:44:19,281 (lifecycleSupervisor-1-5) [DEBUG - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:75)] Initializing ReliableTaildirEventReader with directory={f1=/home/retailteg/flume/flumeSpool/initial_insert}, metaDir={} 2017-03-10 19:44:19,283 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:83)] taildirCache: [{filegroup='f1', filePattern='/home/retailteg/flume/flumeSpool/initial_insert', cached=true}] 2017-03-10 19:44:19,285 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:84)] headerTable: {} 2017-03-10 19:44:19,289 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)] Opening file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, pos: 0 2017-03-10 19:44:19,290 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:94)] Updating position from position file: /home/retailteg/flume/taildir_position.json 2017-03-10 19:44:19,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:141)] Polling sink runner starting 2017-03-10 19:44:19,362 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.TailFile.updatePos(TailFile.java:126)] Updated position, file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, pos: 884 2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [DEBUG - org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:118)] TaildirSource started 2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: RTI: Successfully registered new MBean. 2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: RTI started 2017-03-10 19:44:19,366 (PollableSourceRunner-TaildirSource-RTI) [DEBUG - org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:127)] Polling runner starting. Source:Taildir source: { positionFile: /home/retailteg/flume/taildir_position.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 } 2017-03-10 19:46:19,459 (PollableSourceRunner-TaildirSource-RTI) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, pos: 884 -- This message was sent by Atlassian JIRA (v6.3.15#6346)