@Thees, Could you open a JIRA to track this issue? And could you also describe the issue in more specific details in the JIRA? e.g. when you mentioned that "HdfsWriter opens a Bucket for every new file", do you mean that HDFSWriter will open a new file everytime a new event is sent via HdfsSystemProducer? Could you also attach the full config and container logs as well in the JIRA?
Thanks! -Yi On Wed, Jul 27, 2016 at 12:31 AM, Thees Gieselmann <t.gieselm...@mytaxi.com> wrote: > Hello, > > using the HdfsWriter provided by samza package samza-hdfs we try to write > events to HDFS. > After the newest patches in version 0.10.1 the Bug regarding closing files > was fixed. But the Bucketer does not seem to work with any of the given > HdfsWriter implementations. > Every new event sent to the hdfs outputstream system is creating a new file > on HDFS. According to > documentation > "systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864" > should have taken care of appending events to an existing file until the > bytes threshold is reached. > Is this a known bug or have I missed sth in my implementation. > > Code Snippet: > > > > # HDFS System > > > > > systems.hdfsstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory > > > > > systems.hdfsstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter > > systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864 > > systems.hdfsstream.samza.msg.serde=json > > > > # The base dir for HDFS output. The default Bucketer for SequenceFile > > HdfsWriters > > > > > systems.hdfsstream.producer.hdfs.base.output.dir=/user/hive/warehouse/foobar > > # Bucket into following > > > > > systems.hdfsstream.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer > > systems.hdfsstream.producer.hdfs.bucketer.date.path.format=yyyy > > > > > > > > private final SystemStream outputStream; > > outputStream = new SystemStream("hdfsstream", "foobarTask"); > > > > > > > > > @Override > > public void process(IncomingMessageEnvelope envelope, MessageCollector > > messageCollector, TaskCoordinator taskCoordinator) throws > > ClassNotFoundException, SQLException > > { > > final Timer.Context context = registry.timer("foobar").time(); > > try > > { > > String incoming = (String) envelope.getMessage(); > > GsonBuilder gsonBuilder = new > > GsonBuilder().registerTypeAdapter(DateTime.class, new > DateTimeConverter()); > > Gson gson = gsonBuilder.create(); > > SomeClass message = gson.fromJson(incoming, SomeClass.class); > > > > try{ > > messageCollector.send(new > > OutgoingMessageEnvelope(outputStream, gson.toJson(message))); > > registry.counter("foobar").inc(); > > } > > catch (Exception e) > > { > > LOGGER.error("error with message: ", e); > > registry.counter("failedProcessCounter").inc(); > > } > > } > > finally > > { > > context.stop(); > > } > > } > > > > Kind regards > Thees Gieselmann > > -- > Board of Directors: Jan-Niclaus Mewes, Claas Heiland > Commercial Register: HRB 110377 > > The information and attached file(s) (if any) contained in this email is > confidential and may be legally privileged. It is intended solely for the > addressee. Any access to this email by persons other than the addressee is > prohibited. If you are not the addressee for whom this email is intended, > you may not disclose, copy, distribute or store this email. If you receive > this in error, please delete and email confirmation to the sender. Thank > you for your cooperation. >