some  confige file (core-site.xml  ,   hdfs-site.xml)about hadoop   have not  
“impl” target
在 2014年10月15日,下午12:12,raghuveer sj <[email protected]> 写道:

> Hi,
> 
> I have been using hadoop 2.4.1 and flume 1.5.0.1 for experimenting and
> pretty new to these. I have flume-conf.properties as below:
> 
> agentMe.channels = memory-channel
> agentMe.sources = my-source
> agentMe.sinks = log-sink hdfs-sink
> 
> agentMe.channels.memory-channel.type = memory
> agentMe.channels.memory-channel.capacity = 1000
> agentMe.channels.memory-channel.transactionCapacity = 100
> 
> agentMe.sources.my-source.type = syslogtcp
> #agentMe.sources.my-source.bind = 192.168.7.129
> agentMe.sources.my-source.port = 8100
> agentMe.sources.my-source.channels = memory-channel
> 
> # Define a sink that outputs to logger.
> agentMe.sinks.log-sink.channel = memory-channel
> agentMe.sinks.log-sink.type = logger
> 
> # Define a sink that outputs to hdfs.
> agentMe.sinks.hdfs-sink.channel = memory-channel
> agentMe.sinks.hdfs-sink.type = hdfs
> agentMe.sinks.hdfs-sink.hdfs.path =
> hdfs://localhost:54310/user/raghuveer/science
> agentMe.sinks.hdfs-sink.hdfs.fileType = DataStream
> agentMe.sinks.hdfs-sink.hdfs.batchSize = 2
> agentMe.sinks.hdfs-sink.hdfs.rollCount = 0
> agentMe.sinks.hdfs-sink.hdfs.rollSize = 0
> agentMe.sinks.hdfs-sink.hdfs.rollInterval = 3
> agentMe.sinks.hdfs-sink.hdfs.writeFormat = Text
> #agentMe.sinks.hdfs-sink.hdfs.path = /user/raghuveer/%y-%m-%d/%H%M/%S
> 
> 
> and trying to make a simple call from
> 
> 
> public class FlumeClient {
> 
>    public static void main(String[] args) {
> 
>        MyRpcClientFacade client = new MyRpcClientFacade();
>        // Initialize client with the remote Flume agent's host and port
>        client.init("192.X.X.54", 8100);
> 
>        // Send 10 events to the remote Flume agent. That agent should be
>        // configured to listen with an AvroSource.
>        String sampleData = "Hello Flume from Raghuveer...!";
>        for (int i = 0; i < 10; i++) {
>            client.sendDataToFlume(sampleData);
>        }
> 
>        client.cleanUp();
>    }
> }
> 
> 
> 
> import java.nio.charset.Charset;
> 
> import org.apache.flume.Event;
> import org.apache.flume.EventDeliveryException;
> import org.apache.flume.api.RpcClient;
> import org.apache.flume.api.RpcClientFactory;
> import org.apache.flume.event.EventBuilder;
> 
> public class MyRpcClientFacade {
> 
>    private RpcClient client;
>    private String hostname;
>    private int port;
> 
>    public void init(String hostname, int port) {
>        // Setup the RPC connection
>        this.hostname = hostname;
>        this.port = port;
>        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
>        // Use the following method to create a thrift client (instead of
> the
>        // above line):
>        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
>    }
> 
>    public void sendDataToFlume(String data) {
>        // Create a Flume Event object that encapsulates the sample data
>        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
> 
>        // Send the event
>        try {
>            client.append(event);
>        } catch (EventDeliveryException e) {
>            // clean up and recreate the client
>            client.close();
>            client = null;
>            client = RpcClientFactory.getDefaultInstance(hostname, port);
> 
> 
>        }
>    }
> 
>    public void cleanUp() {
>        // Close the RPC connection
>        client.close();
>    }
> }
> 
> when i make the call the call fails with the following exception:
> 
> ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:467)  - process failed
> java.lang.UnsupportedOperationException: Not implemented by the
> DistributedFileSystem FileSystem implementation
>    at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214)
>    at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365)
>    at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
>    at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
>    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>    at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
>    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270)
>    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262)
>    at
> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718)
>    at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183)
>    at
> org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
>    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715)
>    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>    at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>    at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>    at java.lang.Thread.run(Thread.java:745)
> 13 Oct 2014 12:53:38,685 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
> org.apache.flume.EventDeliveryException:
> java.lang.UnsupportedOperationException: Not implemented by the
> DistributedFileSystem FileSystem implementation
>    at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
>    at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>    at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException: Not implemented by the
> DistributedFileSystem FileSystem implementation
>    at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214)
>    at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365)
>    at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
>    at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
>    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>    at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
>    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270)
>    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262)
>    at
> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718)
>    at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183)
>    at
> org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
>    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715)
>    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 
> 
> 
> Kindly help me out.
> 
> Regards.

Reply via email to