Hi, (sending from my other handle since the apache mail relay seems to be down for me)
I’m not aware of anyone having tested the RollingSink with anything besides “hdfs://“ and “file://“. That the file is empty is strange. Is something like revokeLease() necessary for your custom HCFS? Cheers, Aljoscha > On 23 Mar 2016, at 17:53, Vijay Srinivasaraghavan > <vijikar...@yahoo.com.INVALID> wrote: > > Hi Aljoscha, > It was my bad that I have copied some wrong class files during one of the > step. I have retried the same steps that I mentioned earlier and with that I > am able to see all the debug statements that I have added to the RollingSink.. > I have found another interesting issue here. I am using HCFS (Hadoop > Compatible File System) implementation of the filesystem that we have built > in-house (not stock HDFC). As part of the recovery process in the > restoreState() method of RollingSink class, we are trying to invoke > revokeLease() API which is available only in DistributedFileSystem (or any > inherited class) whereas the HCFS contact class that we have implemented is > FileSystem. Since the codepath will not invoke revokeLease() for our HCFS > implementation class, I am seeing the part file with empty content though the > file is renamed from "in-progress" to actual file name. > Question: Do you know if RollingSink implementation is tested with any Hadoop > Compatible File System like GlusterFS, etc.,? > RegardsVijay > > On Wednesday, March 23, 2016 7:42 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Hmm, that’s strange. Could you maybe send one of the TaskManager logs? > > Cheers, > Aljoscha >> On 23 Mar 2016, at 15:28, Vijay <vijikar...@yahoo.com.INVALID> wrote: >> >> Yes, I have updated on all cluster nodes and restarted entire cluster. >> >> Do you see any problems with the steps that I followed? >> >> Regards, >> Vijay >> >> Sent from my iPhone >> >>> On Mar 23, 2016, at 7:18 AM, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> Hi, >>> did you update the log4j.properties file on all nodes where the >>> TaskManagers run and did you restart the whole cluster? >>> >>> Cheers, >>> Aljoscha >>>> On 23 Mar 2016, at 15:02, Vijay <vijikar...@yahoo.com.INVALID> wrote: >>>> >>>> Hi Aljoscha, >>>> >>>> I am using standalone flink cluster (3 node). I am running flink job by >>>> submitting/uploading jar through Flink UI. >>>> >>>> I have built flink from maven and modified the RollingSink code to add new >>>> debug statements. >>>> >>>> I have also packaged the streaming file system connector package >>>> (including RollingSink changes) to the job jar file. Modified changes >>>> include both Sytem.out as well as logger statements. >>>> >>>> Updated log4j property file to DEBUG >>>> >>>> Regards, >>>> Vijay >>>> >>>> Sent from my iPhone >>>> >>>>> On Mar 23, 2016, at 6:48 AM, Aljoscha Krettek <aljos...@apache.org> wrote: >>>>> >>>>> Hi, >>>>> what where the steps you took? By the way, are you running this on yarn >>>>> or in standalone mode? How are you starting the Flink job? Do you still >>>>> don’t see DEBUG entries in the log? >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>>> On 23 Mar 2016, at 14:32, Vijay <vijikar...@yahoo.com> wrote: >>>>>> >>>>>> I have changed the properties file but it did not help. >>>>>> >>>>>> Regards, >>>>>> Vijay >>>>>> >>>>>> Sent from my iPhone >>>>>> >>>>>>> On Mar 23, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>> Ok, then you should be able to change the log level to DEBUG in >>>>>>> conf/log4j.properties. >>>>>>> >>>>>>>> On 23 Mar 2016, at 12:41, Vijay <vijikar...@yahoo.com> wrote: >>>>>>>> >>>>>>>> I think only the ERROR category gets displayed in the log file >>>>>>>> >>>>>>>> Regards, >>>>>>>> Vijay >>>>>>>> >>>>>>>> Sent from my iPhone >>>>>>>> >>>>>>>>> On Mar 23, 2016, at 2:30 AM, Aljoscha Krettek <aljos...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> are you seeing the regular log output from the RollingSink in the >>>>>>>>> TaskManager logs? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Aljoscha >>>>>>>>>> On 22 Mar 2016, at 20:03, Vijay Srinivasaraghavan >>>>>>>>>> <vijikar...@yahoo.com> wrote: >>>>>>>>>> >>>>>>>>>> I have tried both log4j logger as well as System.out.println option >>>>>>>>>> but none of these worked. >>>>>>>>>> >>>>>>>>>> From what I have seen so far is the Filesystem streaming connector >>>>>>>>>> classes are not packaged in the grand jar >>>>>>>>>> (flink-dist_2.10-1.1-SNAPSHOT.jar) that is copied under >>>>>>>>>> <FLINK_HOME>/build-target/lib location as part of Flink maven build >>>>>>>>>> step. >>>>>>>>>> >>>>>>>>>> So, I manually copied (overwrite) the compiled class files from >>>>>>>>>> org.apache.flink.streaming.connectors.fs package to the my "Flink >>>>>>>>>> job" distribution jar (otherwise it was using standard jars that are >>>>>>>>>> defined as mvn dependency in Articatory) and then uploaded the jar >>>>>>>>>> to Job Manager. >>>>>>>>>> >>>>>>>>>> Am I missing something? How do I enable logging for the RollingSink >>>>>>>>>> class? >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> <artifactId>flink-connector-filesystem_2.11</artifactId> >>>>>>>>>> <version>${flink.version}</version> >>>>>>>>>> <scope>provided</scope> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tuesday, March 22, 2016 3:04 AM, Aljoscha Krettek >>>>>>>>>> <aljos...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> how are you printing the debug statements? >>>>>>>>>> >>>>>>>>>> But yeah all the logic of renaming in progress files and cleaning up >>>>>>>>>> after a failed job happens in restoreState(BucketState state). The >>>>>>>>>> steps are roughly these: >>>>>>>>>> >>>>>>>>>> 1. Move current in-progress file to final location >>>>>>>>>> 2. truncate the file if necessary (if truncate is not available >>>>>>>>>> write a .valid-length file) >>>>>>>>>> 3. Move pending files to final location that where part of the >>>>>>>>>> checkpoint >>>>>>>>>> 4. cleanup any leftover pending/in-progress files >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>>> On 22 Mar 2016, at 10:08, Vijay Srinivasaraghavan >>>>>>>>>>> <vijikar...@yahoo.com.INVALID> wrote: >>>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> I have enabled checkpoint and I am using RollingSink to sink the >>>>>>>>>>> data to HDFS (2.7.x) from KafkaConsumer. To simulate >>>>>>>>>>> failover/recovery, I stopped TaskManager and the job gets >>>>>>>>>>> rescheduled to other Taskmanager instance. During this momemnt, the >>>>>>>>>>> current "in-progress" gets closed and renamed to part-0-1 from >>>>>>>>>>> _part-0-1_in-progress. >>>>>>>>>>> I was hoping to see the debug statement that I have added to >>>>>>>>>>> "restoreState" method but none of my debug statement gets printed. >>>>>>>>>>> I am not sure if the restoreState() method gets invoked during this >>>>>>>>>>> scenario. Could you please help me understand the flow during >>>>>>>>>>> "failover" scenario? >>>>>>>>>>> P.S: Functionally the code appears to be working fine but I am >>>>>>>>>>> trying to understand the underlying implementation details. public >>>>>>>>>>> void restoreState(BucketState state) >>>>>>>>>>> Regards >>>>>>>>>>> Vijay >>> > >