Hi, I’m not aware of anyone having tested the RollingSink with anything besides “hdfs://“ and “file://“. That the file is empty is strange. Is something like revokeLease() necessary for your custom HCFS?
Cheers, Aljoscha On Wed, 23 Mar 2016 at 17:53 Vijay Srinivasaraghavan <vijikar...@yahoo.com.invalid> wrote: > Hi Aljoscha, > It was my bad that I have copied some wrong class files during one of the > step. I have retried the same steps that I mentioned earlier and with that > I am able to see all the debug statements that I have added to the > RollingSink.. > I have found another interesting issue here. I am using HCFS (Hadoop > Compatible File System) implementation of the filesystem that we have built > in-house (not stock HDFC). As part of the recovery process in the > restoreState() method of RollingSink class, we are trying to invoke > revokeLease() API which is available only in DistributedFileSystem (or any > inherited class) whereas the HCFS contact class that we have implemented is > FileSystem. Since the codepath will not invoke revokeLease() for our HCFS > implementation class, I am seeing the part file with empty content though > the file is renamed from "in-progress" to actual file name. > Question: Do you know if RollingSink implementation is tested with any > Hadoop Compatible File System like GlusterFS, etc.,? > RegardsVijay > > On Wednesday, March 23, 2016 7:42 AM, Aljoscha Krettek < > aljos...@apache.org> wrote: > > > Hmm, that’s strange. Could you maybe send one of the TaskManager logs? > > Cheers, > Aljoscha > > On 23 Mar 2016, at 15:28, Vijay <vijikar...@yahoo.com.INVALID> wrote: > > > > Yes, I have updated on all cluster nodes and restarted entire cluster. > > > > Do you see any problems with the steps that I followed? > > > > Regards, > > Vijay > > > > Sent from my iPhone > > > >> On Mar 23, 2016, at 7:18 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> > >> Hi, > >> did you update the log4j.properties file on all nodes where the > TaskManagers run and did you restart the whole cluster? > >> > >> Cheers, > >> Aljoscha > >>> On 23 Mar 2016, at 15:02, Vijay <vijikar...@yahoo.com.INVALID> wrote: > >>> > >>> Hi Aljoscha, > >>> > >>> I am using standalone flink cluster (3 node). I am running flink job > by submitting/uploading jar through Flink UI. > >>> > >>> I have built flink from maven and modified the RollingSink code to add > new debug statements. > >>> > >>> I have also packaged the streaming file system connector package > (including RollingSink changes) to the job jar file. Modified changes > include both Sytem.out as well as logger statements. > >>> > >>> Updated log4j property file to DEBUG > >>> > >>> Regards, > >>> Vijay > >>> > >>> Sent from my iPhone > >>> > >>>> On Mar 23, 2016, at 6:48 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>>> > >>>> Hi, > >>>> what where the steps you took? By the way, are you running this on > yarn or in standalone mode? How are you starting the Flink job? Do you > still don’t see DEBUG entries in the log? > >>>> > >>>> Cheers, > >>>> Aljoscha > >>>>> On 23 Mar 2016, at 14:32, Vijay <vijikar...@yahoo.com> wrote: > >>>>> > >>>>> I have changed the properties file but it did not help. > >>>>> > >>>>> Regards, > >>>>> Vijay > >>>>> > >>>>> Sent from my iPhone > >>>>> > >>>>>> On Mar 23, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>>>>> > >>>>>> Ok, then you should be able to change the log level to DEBUG in > conf/log4j.properties. > >>>>>> > >>>>>>> On 23 Mar 2016, at 12:41, Vijay <vijikar...@yahoo.com> wrote: > >>>>>>> > >>>>>>> I think only the ERROR category gets displayed in the log file > >>>>>>> > >>>>>>> Regards, > >>>>>>> Vijay > >>>>>>> > >>>>>>> Sent from my iPhone > >>>>>>> > >>>>>>>> On Mar 23, 2016, at 2:30 AM, Aljoscha Krettek < > aljos...@apache.org> wrote: > >>>>>>>> > >>>>>>>> Hi, > >>>>>>>> are you seeing the regular log output from the RollingSink in the > TaskManager logs? > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Aljoscha > >>>>>>>>> On 22 Mar 2016, at 20:03, Vijay Srinivasaraghavan < > vijikar...@yahoo.com> wrote: > >>>>>>>>> > >>>>>>>>> I have tried both log4j logger as well as System.out.println > option but none of these worked. > >>>>>>>>> > >>>>>>>>> From what I have seen so far is the Filesystem streaming > connector classes are not packaged in the grand jar > (flink-dist_2.10-1.1-SNAPSHOT.jar) that is copied under > <FLINK_HOME>/build-target/lib location as part of Flink maven build step. > >>>>>>>>> > >>>>>>>>> So, I manually copied (overwrite) the compiled class files from > org.apache.flink.streaming.connectors.fs package to the my "Flink job" > distribution jar (otherwise it was using standard jars that are defined as > mvn dependency in Articatory) and then uploaded the jar to Job Manager. > >>>>>>>>> > >>>>>>>>> Am I missing something? How do I enable logging for the > RollingSink class? > >>>>>>>>> > >>>>>>>>> <dependency> > >>>>>>>>> <groupId>org.apache.flink</groupId> > >>>>>>>>> <artifactId>flink-connector-filesystem_2.11</artifactId> > >>>>>>>>> <version>${flink.version}</version> > >>>>>>>>> <scope>provided</scope> > >>>>>>>>> </dependency> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tuesday, March 22, 2016 3:04 AM, Aljoscha Krettek < > aljos...@apache.org> wrote: > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> how are you printing the debug statements? > >>>>>>>>> > >>>>>>>>> But yeah all the logic of renaming in progress files and > cleaning up after a failed job happens in restoreState(BucketState state). > The steps are roughly these: > >>>>>>>>> > >>>>>>>>> 1. Move current in-progress file to final location > >>>>>>>>> 2. truncate the file if necessary (if truncate is not available > write a .valid-length file) > >>>>>>>>> 3. Move pending files to final location that where part of the > checkpoint > >>>>>>>>> 4. cleanup any leftover pending/in-progress files > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Aljoscha > >>>>>>>>> > >>>>>>>>>> On 22 Mar 2016, at 10:08, Vijay Srinivasaraghavan > <vijikar...@yahoo.com.INVALID> wrote: > >>>>>>>>>> > >>>>>>>>>> Hello, > >>>>>>>>>> I have enabled checkpoint and I am using RollingSink to sink > the data to HDFS (2.7.x) from KafkaConsumer. To simulate failover/recovery, > I stopped TaskManager and the job gets rescheduled to other Taskmanager > instance. During this momemnt, the current "in-progress" gets closed and > renamed to part-0-1 from _part-0-1_in-progress. > >>>>>>>>>> I was hoping to see the debug statement that I have added to > "restoreState" method but none of my debug statement gets printed. I am not > sure if the restoreState() method gets invoked during this scenario. Could > you please help me understand the flow during "failover" scenario? > >>>>>>>>>> P.S: Functionally the code appears to be working fine but I am > trying to understand the underlying implementation details. public void > restoreState(BucketState state) > >>>>>>>>>> Regards > >>>>>>>>>> Vijay > >> > > >