[ https://issues.apache.org/jira/browse/FLUME-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15951969#comment-15951969 ]
zhiqiangzhao commented on FLUME-3074: ------------------------------------- thanks a lot for your comment. That means it is not right I use fileHeader as the key? What I confused in that in fact it runs normally In Linux. What I want is as follows: a) I Know I have many logs in different directories,and file name is like timestamp.dat b) I want to set part of directories to use one number as partition id to load share,that means every log file will be put into configured kafka partition Can you give some suggestions? > format error happened on windows when kafka sink is used > -------------------------------------------------------- > > Key: FLUME-3074 > URL: https://issues.apache.org/jira/browse/FLUME-3074 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: 1.7.0 > Environment: windows 10 > Reporter: zhiqiangzhao > > When I use kafka sink to push logs from windows path to kafka server, I meet > the format error as follows > I think it is because windows path was parsed in linux style。 > 17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver > event. Exception follows. > org.apache.flume.EventDeliveryException: Failed to publish events > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flume.EventDeliveryException: Non integer partition id > specified > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214) > ... 3 more > Caused by: java.lang.NumberFormatException: For input string: > "C:\Users\smart\Desktop\source\23.txt" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.parseInt(Integer.java:615) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202) > ... 3 more > My config text is > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > tier1.sources = s1 > tier1.channels = c1 > tier1.sinks = sk1 > > tier1.sources.s1.type = spooldir > tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop\\source > tier1.sources.s1.fileHeader = true > tier1.sources.s1.fileHeaderKey = key > tier1.sources.s1.deserializer = > org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder > tier1.sources.s1.deserializer.maxBlobLength = 2000000000 > tier1.sources.s1.deserializer.maxBackoff=30000 > tier1.sources.s1.channels = c1 > tier1.channels.c1.type = memory > tier1.channels.c1.capacity = 10004 > tier1.channels.c1.transactionCapacity = 100 > tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink > tier1.sinks.sk1.topic = winSink > tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092 > tier1.sinks.sk1.partitionIdHeader = key > tier1.sinks.sk1.channel = c1 > tier1.sinks.sk1.batchSize = 20 -- This message was sent by Atlassian JIRA (v6.3.15#6346)