Jovan Kilibarda created FLUME-2809:
--------------------------------------

             Summary: sink.kite.DatasetSink MalformedInputException
                 Key: FLUME-2809
                 URL: https://issues.apache.org/jira/browse/FLUME-2809
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.7.0
         Environment: Ubuntu VM:
uname -a
Linux ub64-master 3.19.0-25-generic #26~14.04.1-Ubuntu SMP Fri Jul 24 21:16:20 
UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

            Reporter: Jovan Kilibarda


Getting this error when trying to use org.apache.flume.sink.kite.DatasetSink

# apache-flume-1.7.0-SNAPSHOT-bin/logs/flume.log
08 Oct 2015 15:19:39,991 INFO  [lifecycleSupervisor-1-0] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.start:96)  - Component 
type: SOURCE, name: spooldir-src started
08 Oct 2015 15:19:40,183 ERROR [pool-3-thread-1] 
(org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256)  
- FATAL: Spool Directory source spooldir-src: { spoolDir: /var/flume/spooldir/ 
}: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure 
Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
        at 
org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:282)
        at 
org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133)
        at 
org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71)
        at 
org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90)
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
        at 
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
08 Oct 2015 15:19:40,550 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize:147)  - Opened 
output appender 
AvroAppender{path=file:/home/bob/kite_demo/dns_data/.2d20b2e7-64f2-4d09-87da-aa12120518f7.avro.tmp,
 schema="bytes", fileSystem=org.apache.hadoop.fs.LocalFileSystem@15e5a95d, 
enableCompression=true, 
dataFileWriter=org.apache.avro.file.DataFileWriter@18d223e6, 
writer=org.apache.avro.reflect.ReflectDatumWriter@c32ddc5} for 
file:/home/bob/kite_demo/dns_data/2d20b2e7-64f2-4d09-87da-aa12120518f7.avro


# Here is how I get into this problem...

# This is flume configuration file
# conf/flume.kite.conf
a5.sources = spooldir-src
a5.sinks = sink1
a5.channels = mem-ch1
a5.channels.mem-ch1.type = memory
a5.channels.mem-ch1.capacity = 10000
a5.channels.mem-ch1.transactionCapacity = 1000
a5.sources.spooldir-src.type = spooldir
a5.sources.spooldir-src.spoolDir = /var/flume/spooldir/
a5.sources.spooldir-src.deletePolicy = immediate
a5.sources.spooldir-src.channels = mem-ch1
a5.sources.spooldir-src.selector.type = replicating
a5.sources.spooldir-src.interceptors = i1
a5.sources.spooldir-src.interceptors.i1.type = 
org.apache.flume.interceptor.ibInterceptor$Builder
a5.sources.spooldir-src.interceptors.i1.preserveExisting = false
a5.sources.spooldir-src.interceptors.i1.header = flume.avro.schema.literal
a5.sources.spooldir-src.interceptors.i1.schema = /var/schema/dns_data.avsc
a5.sinks.sink1.type = org.apache.flume.sink.kite.DatasetSink
a5.sinks.sink1.channel = mem-ch1
a5.sinks.sink1.kite.dataset.uri = dataset:file:/home/bob/kite_demo/dns_data
a5.sinks.sink1.kite.entityParser = avro


cat  /var/schema/dns_data.avsc
"bytes"

# Start flume
apache-flume-1.7.0-SNAPSHOT-bin$ ./bin/flume-ng agent -c conf -f 
conf/flume.kite.conf -n a5

# Receive a text file with one line only
# captured-dns.txt
19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A 
response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;

# copy it to /home/bob/tmp/captured-dns_filtered
# serialize it using generic schema
java  -jar  ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar   
fromtext  /home/bob/tmp/captured-dns_filtered  
/home/bob/tmp/captured-dns_filtered.avro

# produced avro file looks ok; here is schema pulled out of it
java  -jar  ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar  
getschema /var/flume/spooldir/captured-dns_filtered.avro
"bytes"

# and the data
java  -jar  ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar  
totext /var/flume/spooldir/captured-dns_filtered.avro  -
19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A 
response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;

# so, move avro file to flume spool folder
mv  /home/bob/tmp/captured-dns_filtered.avro  /var/flume/spooldir/
# and the above log happens.

# I added interceptor which seems to be configured correctly and I can see 
debug message 
# when I start flume
/**
   * Only {@link ibInterceptor.Builder} can build me
   */
  private ibInterceptor(boolean preserveExisting, boolean useIP,
                        String header, String schema)
  {
    this.preserveExisting = preserveExisting;
    this.header = header;
    this.schema = schema;
    InetAddress addr;
    System.out.println("\n-ibInterceptor header=" + header);
    System.out.println("-ibInterceptor schema=" + schema);
    try {
      addr = InetAddress.getLocalHost();
      if (useIP) {
        host = addr.getHostAddress();
        System.out.println("-ibInterceptor host=" + host);
      } else {
        host = addr.getCanonicalHostName();
      }
    } catch (UnknownHostException e) {
      logger.warn("Could not get local host address. Exception follows.", e);
    }
  }

# the intercept() method is not called; the println message at the very 
beginning never 
# gets displayed
# though it should based on java docs. Using this same custom interceptor with 
other kind 
# of sinks, like file_roll works fine.
  /**
   * Modifies events in-place.
   */
  @Override
  public Event intercept(Event event)
  {
    System.out.println("--intercept()");

    if (schema != null) {
      String schema_def = "bytes";
      headers.put(header, schema_def);
      System.out.println("-intercept(): schema_def=" + schema_def);
    }

    return event;
  }

# The only purpose of my interceptor is to insert schema into the flume event 
header;
# without it, flume complains
08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.sink.kite.policy.RetryPolicy.handle:39)  - Event delivery 
failed: No schema in event headers. Headers must include either 
flume.avro.schema.url or flume.avro.schema.literal
08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. 
Exception follows.

# makes sense based on Kite Dataset Sink docs.

Am I doing something wrong?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to