@Thees,

Could you open a JIRA to track this issue? And could you also describe the
issue in more specific details in the JIRA? e.g. when you mentioned that
"HdfsWriter opens a Bucket for every new file", do you mean that HDFSWriter
will open a new file everytime a new event is sent via HdfsSystemProducer?
Could you also attach the full config and container logs as well in the
JIRA?

Thanks!

-Yi

On Wed, Jul 27, 2016 at 12:31 AM, Thees Gieselmann <t.gieselm...@mytaxi.com>
wrote:

> Hello,
>
> using the HdfsWriter provided by samza package samza-hdfs we try to write
> events to HDFS.
> After the newest patches in version 0.10.1 the Bug regarding closing files
> was fixed. But the Bucketer does not seem to work with any of the given
> HdfsWriter implementations.
> Every new event sent to the hdfs outputstream system is creating a new file
> on HDFS. According to
> documentation
> "systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864"
> should have taken care of appending events to an existing file until the
> bytes threshold is reached.
> Is this a known bug or have I missed sth in my implementation.
>
> Code Snippet:
>
>
> > # HDFS System
> >
> >
> systems.hdfsstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
> >
> >
> systems.hdfsstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter
> > systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864
> > systems.hdfsstream.samza.msg.serde=json
> >
> > # The base dir for HDFS output. The default Bucketer for SequenceFile
> > HdfsWriters
> >
> >
> systems.hdfsstream.producer.hdfs.base.output.dir=/user/hive/warehouse/foobar
> > # Bucket into following
> >
> >
> systems.hdfsstream.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
> > systems.hdfsstream.producer.hdfs.bucketer.date.path.format=yyyy
> >
> >
>
>
> > private final SystemStream outputStream;
> > outputStream = new SystemStream("hdfsstream", "foobarTask");
> > >
> >
> >
>
> @Override
> > public void process(IncomingMessageEnvelope envelope, MessageCollector
> > messageCollector, TaskCoordinator taskCoordinator) throws
> > ClassNotFoundException, SQLException
> > {
> >     final Timer.Context context = registry.timer("foobar").time();
> >     try
> >     {
> >         String incoming = (String) envelope.getMessage();
> >         GsonBuilder gsonBuilder = new
> > GsonBuilder().registerTypeAdapter(DateTime.class, new
> DateTimeConverter());
> >         Gson gson = gsonBuilder.create();
> >         SomeClass message = gson.fromJson(incoming, SomeClass.class);
> >
> >         try{
> >             messageCollector.send(new
> > OutgoingMessageEnvelope(outputStream, gson.toJson(message)));
> >             registry.counter("foobar").inc();
> >         }
> >         catch (Exception e)
> >         {
> >             LOGGER.error("error with message: ", e);
> >             registry.counter("failedProcessCounter").inc();
> >         }
> >     }
> >     finally
> >     {
> >         context.stop();
> >     }
> > }
>
>
>
> Kind regards
> Thees Gieselmann
>
> --
> Board of Directors: Jan-Niclaus Mewes, Claas Heiland
> Commercial Register: HRB 110377
>
> The information and attached file(s) (if any) contained in this email is
> confidential and may be legally privileged. It is intended solely for the
> addressee. Any access to this email by persons other than the addressee is
> prohibited. If you are not the addressee for whom this email is intended,
> you may not disclose, copy, distribute or store this email. If you receive
> this in error, please delete and email confirmation to the sender. Thank
> you for your cooperation.
>

Reply via email to