What I am seeing is that for every event that I send a new file in hadoop is being created. I was expecting that file handle would just write to existing file until it gets rolled over as specified in the configs. Am I doing something wrong?
12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027956.tmp 12/06/15 17:28:52 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027956.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027956 12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027957.tmp 12/06/15 17:28:52 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027957.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027957 12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027958.tmp 12/06/15 17:28:52 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027958.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027958 12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027959.tmp 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027959.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027959 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027960.tmp 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027960.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027960 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027961.tmp 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027961.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027961 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027962.tmp 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027962.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027962 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027963.tmp 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027963.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027963 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027964.tmp 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027964.tmp to hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027964 foo.sources = avroSrc foo.channels = memoryChannel foo.sinks = hdfsSink # For each one of the sources, the type is defined foo.sources.avroSrc.type = avro # The channel can be defined as follows. foo.sources.avroSrc.channels = memoryChannel foo.sources.avroSrc.bind = 0.0.0.0 foo.sources.avroSrc.port = 41414 # Each sink's type must be defined foo.sinks.hdfsSink.type = hdfs foo.sinks.hdfsSink.hdfs.path = hdfs://dsdb1:54310/flume/'%{host}' foo.sinks.hdfsSink.file.Prefix = web foo.sinks.hdfsSink.file.rollInterval = 600 foo.sinks.hdfsSink.file.Type = SequenceFile #Specify the channel the sink should use foo.sinks.hdfsSink.channel = memoryChannel code: public void sendDataToFlume(String data) { // Create flume event object Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); Map<String,String> headers = new HashMap<String,String>(); headers.put("host", hostName); event.setHeaders(headers); try { rpcClient.append(event); } catch (EventDeliveryException e) { connect(); } @Test public void testAvroClient() throws InterruptedException{ AvroClient aClient = new AvroClient(); int i = 0; int j = 500; while(i++ < j){ aClient.sendDataToFlume("Hello"); if(i == j/2){ //Thread.sleep(30000); } } } }