Jovan Kilibarda created FLUME-2809: -------------------------------------- Summary: sink.kite.DatasetSink MalformedInputException Key: FLUME-2809 URL: https://issues.apache.org/jira/browse/FLUME-2809 Project: Flume Issue Type: Bug Components: Sinks+Sources Affects Versions: v1.7.0 Environment: Ubuntu VM: uname -a Linux ub64-master 3.19.0-25-generic #26~14.04.1-Ubuntu SMP Fri Jul 24 21:16:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Jovan Kilibarda Getting this error when trying to use org.apache.flume.sink.kite.DatasetSink # apache-flume-1.7.0-SNAPSHOT-bin/logs/flume.log 08 Oct 2015 15:19:39,991 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96) - Component type: SOURCE, name: spooldir-src started 08 Oct 2015 15:19:40,183 ERROR [pool-3-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256) - FATAL: Spool Directory source spooldir-src: { spoolDir: /var/flume/spooldir/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing. java.nio.charset.MalformedInputException: Input length = 1 at java.nio.charset.CoderResult.throwException(CoderResult.java:277) at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:282) at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133) at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71) at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90) at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252) at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 08 Oct 2015 15:19:40,550 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize:147) - Opened output appender AvroAppender{path=file:/home/bob/kite_demo/dns_data/.2d20b2e7-64f2-4d09-87da-aa12120518f7.avro.tmp, schema="bytes", fileSystem=org.apache.hadoop.fs.LocalFileSystem@15e5a95d, enableCompression=true, dataFileWriter=org.apache.avro.file.DataFileWriter@18d223e6, writer=org.apache.avro.reflect.ReflectDatumWriter@c32ddc5} for file:/home/bob/kite_demo/dns_data/2d20b2e7-64f2-4d09-87da-aa12120518f7.avro # Here is how I get into this problem... # This is flume configuration file # conf/flume.kite.conf a5.sources = spooldir-src a5.sinks = sink1 a5.channels = mem-ch1 a5.channels.mem-ch1.type = memory a5.channels.mem-ch1.capacity = 10000 a5.channels.mem-ch1.transactionCapacity = 1000 a5.sources.spooldir-src.type = spooldir a5.sources.spooldir-src.spoolDir = /var/flume/spooldir/ a5.sources.spooldir-src.deletePolicy = immediate a5.sources.spooldir-src.channels = mem-ch1 a5.sources.spooldir-src.selector.type = replicating a5.sources.spooldir-src.interceptors = i1 a5.sources.spooldir-src.interceptors.i1.type = org.apache.flume.interceptor.ibInterceptor$Builder a5.sources.spooldir-src.interceptors.i1.preserveExisting = false a5.sources.spooldir-src.interceptors.i1.header = flume.avro.schema.literal a5.sources.spooldir-src.interceptors.i1.schema = /var/schema/dns_data.avsc a5.sinks.sink1.type = org.apache.flume.sink.kite.DatasetSink a5.sinks.sink1.channel = mem-ch1 a5.sinks.sink1.kite.dataset.uri = dataset:file:/home/bob/kite_demo/dns_data a5.sinks.sink1.kite.entityParser = avro cat /var/schema/dns_data.avsc "bytes" # Start flume apache-flume-1.7.0-SNAPSHOT-bin$ ./bin/flume-ng agent -c conf -f conf/flume.kite.conf -n a5 # Receive a text file with one line only # captured-dns.txt 19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4; # copy it to /home/bob/tmp/captured-dns_filtered # serialize it using generic schema java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar fromtext /home/bob/tmp/captured-dns_filtered /home/bob/tmp/captured-dns_filtered.avro # produced avro file looks ok; here is schema pulled out of it java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar getschema /var/flume/spooldir/captured-dns_filtered.avro "bytes" # and the data java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar totext /var/flume/spooldir/captured-dns_filtered.avro - 19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4; # so, move avro file to flume spool folder mv /home/bob/tmp/captured-dns_filtered.avro /var/flume/spooldir/ # and the above log happens. # I added interceptor which seems to be configured correctly and I can see debug message # when I start flume /** * Only {@link ibInterceptor.Builder} can build me */ private ibInterceptor(boolean preserveExisting, boolean useIP, String header, String schema) { this.preserveExisting = preserveExisting; this.header = header; this.schema = schema; InetAddress addr; System.out.println("\n-ibInterceptor header=" + header); System.out.println("-ibInterceptor schema=" + schema); try { addr = InetAddress.getLocalHost(); if (useIP) { host = addr.getHostAddress(); System.out.println("-ibInterceptor host=" + host); } else { host = addr.getCanonicalHostName(); } } catch (UnknownHostException e) { logger.warn("Could not get local host address. Exception follows.", e); } } # the intercept() method is not called; the println message at the very beginning never # gets displayed # though it should based on java docs. Using this same custom interceptor with other kind # of sinks, like file_roll works fine. /** * Modifies events in-place. */ @Override public Event intercept(Event event) { System.out.println("--intercept()"); if (schema != null) { String schema_def = "bytes"; headers.put(header, schema_def); System.out.println("-intercept(): schema_def=" + schema_def); } return event; } # The only purpose of my interceptor is to insert schema into the flume event header; # without it, flume complains 08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kite.policy.RetryPolicy.handle:39) - Event delivery failed: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal 08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. # makes sense based on Kite Dataset Sink docs. Am I doing something wrong? -- This message was sent by Atlassian JIRA (v6.3.4#6332)