did you build flume yourself ? if so perhaps you did not build it using one of the hadoop2 profiles.
On Wed, Oct 15, 2014 at 12:09 AM, superwood <[email protected]> wrote: > some confige file (core-site.xml , hdfs-site.xml)about hadoop have > not “impl” target > 在 2014年10月15日,下午12:12,raghuveer sj <[email protected]> 写道: > > > Hi, > > > > I have been using hadoop 2.4.1 and flume 1.5.0.1 for experimenting and > > pretty new to these. I have flume-conf.properties as below: > > > > agentMe.channels = memory-channel > > agentMe.sources = my-source > > agentMe.sinks = log-sink hdfs-sink > > > > agentMe.channels.memory-channel.type = memory > > agentMe.channels.memory-channel.capacity = 1000 > > agentMe.channels.memory-channel.transactionCapacity = 100 > > > > agentMe.sources.my-source.type = syslogtcp > > #agentMe.sources.my-source.bind = 192.168.7.129 > > agentMe.sources.my-source.port = 8100 > > agentMe.sources.my-source.channels = memory-channel > > > > # Define a sink that outputs to logger. > > agentMe.sinks.log-sink.channel = memory-channel > > agentMe.sinks.log-sink.type = logger > > > > # Define a sink that outputs to hdfs. > > agentMe.sinks.hdfs-sink.channel = memory-channel > > agentMe.sinks.hdfs-sink.type = hdfs > > agentMe.sinks.hdfs-sink.hdfs.path = > > hdfs://localhost:54310/user/raghuveer/science > > agentMe.sinks.hdfs-sink.hdfs.fileType = DataStream > > agentMe.sinks.hdfs-sink.hdfs.batchSize = 2 > > agentMe.sinks.hdfs-sink.hdfs.rollCount = 0 > > agentMe.sinks.hdfs-sink.hdfs.rollSize = 0 > > agentMe.sinks.hdfs-sink.hdfs.rollInterval = 3 > > agentMe.sinks.hdfs-sink.hdfs.writeFormat = Text > > #agentMe.sinks.hdfs-sink.hdfs.path = /user/raghuveer/%y-%m-%d/%H%M/%S > > > > > > and trying to make a simple call from > > > > > > public class FlumeClient { > > > > public static void main(String[] args) { > > > > MyRpcClientFacade client = new MyRpcClientFacade(); > > // Initialize client with the remote Flume agent's host and port > > client.init("192.X.X.54", 8100); > > > > // Send 10 events to the remote Flume agent. That agent should be > > // configured to listen with an AvroSource. > > String sampleData = "Hello Flume from Raghuveer...!"; > > for (int i = 0; i < 10; i++) { > > client.sendDataToFlume(sampleData); > > } > > > > client.cleanUp(); > > } > > } > > > > > > > > import java.nio.charset.Charset; > > > > import org.apache.flume.Event; > > import org.apache.flume.EventDeliveryException; > > import org.apache.flume.api.RpcClient; > > import org.apache.flume.api.RpcClientFactory; > > import org.apache.flume.event.EventBuilder; > > > > public class MyRpcClientFacade { > > > > private RpcClient client; > > private String hostname; > > private int port; > > > > public void init(String hostname, int port) { > > // Setup the RPC connection > > this.hostname = hostname; > > this.port = port; > > this.client = RpcClientFactory.getDefaultInstance(hostname, port); > > // Use the following method to create a thrift client (instead of > > the > > // above line): > > // this.client = RpcClientFactory.getThriftInstance(hostname, > port); > > } > > > > public void sendDataToFlume(String data) { > > // Create a Flume Event object that encapsulates the sample data > > Event event = EventBuilder.withBody(data, > Charset.forName("UTF-8")); > > > > // Send the event > > try { > > client.append(event); > > } catch (EventDeliveryException e) { > > // clean up and recreate the client > > client.close(); > > client = null; > > client = RpcClientFactory.getDefaultInstance(hostname, port); > > > > > > } > > } > > > > public void cleanUp() { > > // Close the RPC connection > > client.close(); > > } > > } > > > > when i make the call the call fails with the following exception: > > > > ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] > > (org.apache.flume.sink.hdfs.HDFSEventSink.process:467) - process failed > > java.lang.UnsupportedOperationException: Not implemented by the > > DistributedFileSystem FileSystem implementation > > at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214) > > at > org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365) > > at > > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375) > > at > > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392) > > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) > > at > > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) > > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) > > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > > at > org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270) > > at > org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262) > > at > > org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718) > > at > > > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183) > > at > > org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59) > > at > org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715) > > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > 13 Oct 2014 12:53:38,685 ERROR > > [SinkRunner-PollingRunner-DefaultSinkProcessor] > > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > > event. Exception follows. > > org.apache.flume.EventDeliveryException: > > java.lang.UnsupportedOperationException: Not implemented by the > > DistributedFileSystem FileSystem implementation > > at > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471) > > at > > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.UnsupportedOperationException: Not implemented by > the > > DistributedFileSystem FileSystem implementation > > at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214) > > at > org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365) > > at > > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375) > > at > > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392) > > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) > > at > > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) > > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) > > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > > at > org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:270) > > at > org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262) > > at > > org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:718) > > at > > > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183) > > at > > org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59) > > at > org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:715) > > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > > > > > > > > Kindly help me out. > > > > Regards. > > -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
