some confige file (core-site.xml , hdfs-site.xml)about hadoop have not
“impl” target
在 2014年10月15日,下午12:12,raghuveer sj <[email protected]> 写道:
> Hi,
>
> I have been using hadoop 2.4.1 and flume 1.5.0.1 for experimenting and
> pretty new to these. I have flume-conf.properties as below:
>
> agentMe.channels = memory-channel
> agentMe.sources = my-source
> agentMe.sinks = log-sink hdfs-sink
>
> agentMe.channels.memory-channel.type = memory
> agentMe.channels.memory-channel.capacity = 1000
> agentMe.channels.memory-channel.transactionCapacity = 100
>
> agentMe.sources.my-source.type = syslogtcp
> #agentMe.sources.my-source.bind = 192.168.7.129
> agentMe.sources.my-source.port = 8100
> agentMe.sources.my-source.channels = memory-channel
>
> # Define a sink that outputs to logger.
> agentMe.sinks.log-sink.channel = memory-channel
> agentMe.sinks.log-sink.type = logger
>
> # Define a sink that outputs to hdfs.
> agentMe.sinks.hdfs-sink.channel = memory-channel
> agentMe.sinks.hdfs-sink.type = hdfs
> agentMe.sinks.hdfs-sink.hdfs.path =
> hdfs://localhost:54310/user/raghuveer/science
> agentMe.sinks.hdfs-sink.hdfs.fileType = DataStream
> agentMe.sinks.hdfs-sink.hdfs.batchSize = 2
> agentMe.sinks.hdfs-sink.hdfs.rollCount = 0
> agentMe.sinks.hdfs-sink.hdfs.rollSize = 0
> agentMe.sinks.hdfs-sink.hdfs.rollInterval = 3
> agentMe.sinks.hdfs-sink.hdfs.writeFormat = Text
> #agentMe.sinks.hdfs-sink.hdfs.path = /user/raghuveer/%y-%m-%d/%H%M/%S
>
>
> and trying to make a simple call from
>
>
> public class FlumeClient {
>
> public static void main(String[] args) {
>
> MyRpcClientFacade client = new MyRpcClientFacade();
> // Initialize client with the remote Flume agent's host and port
> client.init("192.X.X.54", 8100);
>
> // Send 10 events to the remote Flume agent. That agent should be
> // configured to listen with an AvroSource.
> String sampleData = "Hello Flume from Raghuveer...!";
> for (int i = 0; i < 10; i++) {
> client.sendDataToFlume(sampleData);
> }
>
> client.cleanUp();
> }
> }
>
>
>
> import java.nio.charset.Charset;
>
> import org.apache.flume.Event;
> import org.apache.flume.EventDeliveryException;
> import org.apache.flume.api.RpcClient;
> import org.apache.flume.api.RpcClientFactory;
> import org.apache.flume.event.EventBuilder;
>
> public class MyRpcClientFacade {
>
> private RpcClient client;
> private String hostname;
> private int port;
>
> public void init(String hostname, int port) {
> // Setup the RPC connection
> this.hostname = hostname;
> this.port = port;
> this.client = RpcClientFactory.getDefaultInstance(hostname, port);
> // Use the following method to create a thrift client (instead of
> the
> // above line):
> // this.client = RpcClientFactory.getThriftInstance(hostname, port);
> }
>
> public void sendDataToFlume(String data) {
> // Create a Flume Event object that encapsulates the sample data
> Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
>
> // Send the event
> try {
> client.append(event);
> } catch (EventDeliveryException e) {
> // clean up and recreate the client
> client.close();
> client = null;
> client = RpcClientFactory.getDefaultInstance(hostname, port);
>
>
> }
> }
>
> public void cleanUp() {
> // Close the RPC connection
> client.close();
> }
> }
>
> when i make the call the call fails with the following exception:
>
> ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:467) - process failed
> java.lang.UnsupportedOperationException: Not implemented by the
> DistributedFileSystem FileSystem implementation
> at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214)
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365)
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262)
> at
> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
> at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 13 Oct 2014 12:53:38,685 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver
> event. Exception follows.
> org.apache.flume.EventDeliveryException:
> java.lang.UnsupportedOperationException: Not implemented by the
> DistributedFileSystem FileSystem implementation
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException: Not implemented by the
> DistributedFileSystem FileSystem implementation
> at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214)
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365)
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262)
> at
> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
> at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>
>
>
> Kindly help me out.
>
> Regards.