Hi,

I have been using hadoop 2.4.1 and flume 1.5.0.1 for experimenting and
pretty new to these. I have flume-conf.properties as below:

agentMe.channels = memory-channel
agentMe.sources = my-source
agentMe.sinks = log-sink hdfs-sink

agentMe.channels.memory-channel.type = memory
agentMe.channels.memory-channel.capacity = 1000
agentMe.channels.memory-channel.transactionCapacity = 100

agentMe.sources.my-source.type = syslogtcp
#agentMe.sources.my-source.bind = 192.168.7.129
agentMe.sources.my-source.port = 8100
agentMe.sources.my-source.channels = memory-channel

# Define a sink that outputs to logger.
agentMe.sinks.log-sink.channel = memory-channel
agentMe.sinks.log-sink.type = logger

# Define a sink that outputs to hdfs.
agentMe.sinks.hdfs-sink.channel = memory-channel
agentMe.sinks.hdfs-sink.type = hdfs
agentMe.sinks.hdfs-sink.hdfs.path =
hdfs://localhost:54310/user/raghuveer/science
agentMe.sinks.hdfs-sink.hdfs.fileType = DataStream
agentMe.sinks.hdfs-sink.hdfs.batchSize = 2
agentMe.sinks.hdfs-sink.hdfs.rollCount = 0
agentMe.sinks.hdfs-sink.hdfs.rollSize = 0
agentMe.sinks.hdfs-sink.hdfs.rollInterval = 3
agentMe.sinks.hdfs-sink.hdfs.writeFormat = Text
#agentMe.sinks.hdfs-sink.hdfs.path = /user/raghuveer/%y-%m-%d/%H%M/%S


and trying to make a simple call from


public class FlumeClient {

    public static void main(String[] args) {

        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("192.X.X.54", 8100);

        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume from Raghuveer...!";
        for (int i = 0; i < 10; i++) {
            client.sendDataToFlume(sampleData);
        }

        client.cleanUp();
    }
}



import java.nio.charset.Charset;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

public class MyRpcClientFacade {

    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of
the
        // above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }

    public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

        // Send the event
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            // clean up and recreate the client
            client.close();
            client = null;
            client = RpcClientFactory.getDefaultInstance(hostname, port);


        }
    }

    public void cleanUp() {
        // Close the RPC connection
        client.close();
    }
}

when i make the call the call fails with the following exception:

ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:467)  - process failed
java.lang.UnsupportedOperationException: Not implemented by the
DistributedFileSystem FileSystem implementation
    at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214)
    at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365)
    at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
    at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262)
    at
org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718)
    at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183)
    at
org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
13 Oct 2014 12:53:38,685 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException:
java.lang.UnsupportedOperationException: Not implemented by the
DistributedFileSystem FileSystem implementation
    at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
    at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the
DistributedFileSystem FileSystem implementation
    at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214)
    at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365)
    at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
    at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262)
    at
org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718)
    at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183)
    at
org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)



Kindly help me out.

Regards.

Reply via email to