I propose alternate approach to than the 3 options mentioned above:

In AbstractFileOutputOperator we can introduce one flag saying
isFileSystemAppendSupported.
This flag should be set based on the filePath in setup or activate method.

It can be done in 2 ways:
1. Adding if else rules based on filesystem (e.g. true for HDFS, false for
S3 etc.)
2. Attempt for append to temp file and catch the exception.

This flag will decide openStream behavior. Advantage here is that the flow
is predetermined rather than based on the exception handling.


~ Yogi

On 25 August 2016 at 11:17, Priyanka Gugale <priya...@datatorrent.com>
wrote:

> I would suggest, we override "openStream" in GenericFileOutputOpeator, as
> suggested in option 2 and then handle "append" in different way for FS
> which doesn't support append. Or else create concrete classes for all file
> systems which don't support append and override the required functions.
>
> -1 for modifying Abstract class to take care of unsupported operations.
>
> -Priyanka
>
> On Wed, Aug 24, 2016 at 6:21 PM, Chaitanya Chebolu <
> chaita...@datatorrent.com> wrote:
>
> > Hi All,
> >
> >     GenericFileOutputOpeator which is in Malhar repository works only for
> > few file systems. GenericFileOutputOpeator is extended from
> > AbstractFileOutputOperator.
> >
> > Reason: openStream() method which is in AbstractFileOutputOperator calls
> > append operation. But, all the file systems doesn't support append
> > operation. Some of the file systems which are not supported append()
> > operation are FTP, S3.
> >
> >   If the GenericFileOutputOpeator used for file systems which are not
> > supported append() operation and operator goes down & comes back then
> file
> > system throws exception "Not Supported".
> >
> > Solution: Following method needs to be called instead of fs.append():
> >
> >
> > protected FSDataOutputStream openStreamForNonAppendFS(Path filepath)
> throws
> > IOException    {
> >
> > Path appendTmpFile = new Path(filepath + “_APPENDING”);
> >
> > rename(filepath, appendTmpFile);
> >
> > FSDataInputStream fsIn = fs.open(appendTmpFile);
> >
> > FSDataOutputStream fsOut = fs.create(filepath);
> >
> > IOUtils.copy(fsIn, fsOut);
> >
> > flush(fsOut);
> >
> > fs.delete(appendTmpFile);
> >
> > return fsOut;
> >
> > }
> >
> >
> > Below are the options to fix this issue.
> >
> > (1) Fix it in AbstractFileOutputOperator - Catch the "Not Supported"
> > exception and then call the openStreamForNonAppendFS() method.
> >
> > (2) Fix it in GenericFileOutputOpeator (Same as approach (1))
> >
> > (3) Create a new operator which extends from AbstractFileOutputOperator
> and
> > override the openStream() method. This new operator could be used only
> for
> > file systems which are not supported append operation.
> >
> > Please share your thoughts and vote on above approaches.
> >
> > Regards,
> > Chaitanya
> >
>

Reply via email to