[ 
https://issues.apache.org/jira/browse/FLUME-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15951969#comment-15951969
 ] 

zhiqiangzhao commented on FLUME-3074:
-------------------------------------

thanks a lot for your comment.
That means it is not right I use fileHeader as the key?
What I confused in that  in fact  it runs normally In Linux.

What I want is as follows:
a) I Know I have many  logs in different directories,and file name  is like 
timestamp.dat 
b) I want to set part of directories to use one number as partition id to load 
share,that means every log file will be put into configured kafka partition

Can you give some suggestions?


> format error happened on windows when kafka sink is used
> --------------------------------------------------------
>
>                 Key: FLUME-3074
>                 URL: https://issues.apache.org/jira/browse/FLUME-3074
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.7.0
>         Environment: windows 10
>            Reporter: zhiqiangzhao
>
> When I use kafka sink to push logs from windows path to kafka server, I meet 
> the  format error as follows
> I think  it is because windows path was  parsed in linux style。
> 17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver 
> event. Exception follows.
> org.apache.flume.EventDeliveryException: Failed to publish events
>       at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.EventDeliveryException: Non integer partition id 
> specified
>       at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
>       ... 3 more
> Caused by: java.lang.NumberFormatException: For input string: 
> "C:\Users\smart\Desktop\source\23.txt"
>       at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>       at java.lang.Integer.parseInt(Integer.java:580)
>       at java.lang.Integer.parseInt(Integer.java:615)
>       at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
>       ... 3 more
> My config text is
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> tier1.sources = s1
> tier1.channels = c1  
> tier1.sinks = sk1  
>   
> tier1.sources.s1.type = spooldir  
> tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop\\source
> tier1.sources.s1.fileHeader = true  
> tier1.sources.s1.fileHeaderKey = key
> tier1.sources.s1.deserializer = 
> org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
> tier1.sources.s1.deserializer.maxBlobLength = 2000000000
> tier1.sources.s1.deserializer.maxBackoff=30000
> tier1.sources.s1.channels = c1  
> tier1.channels.c1.type = memory  
> tier1.channels.c1.capacity = 10004  
> tier1.channels.c1.transactionCapacity = 100
> tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
> tier1.sinks.sk1.topic = winSink
> tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
> tier1.sinks.sk1.partitionIdHeader = key
> tier1.sinks.sk1.channel = c1
> tier1.sinks.sk1.batchSize = 20



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to