Hi Hung, Please find my replies marked in blue
What's the underlying file system you are using? I am using the normal file system, not the hdfs. >>file:/home/Installable/gobblin-dist/working-dir/state-store/ FlickrPageExtractorPull_137/current.jst >>Have you checked to make sure there are not multiple instances of the job running concurrently or make use of the job lock? We have the use case where the multiple instances of the Jobs will be run in parallel ;) We have the custom implementation of service which uses linked restli to listen to the incoming rest calls, these rest calls will launch the jobs. There will be multiple instances of Job running concurrently most of the time. We have disabled the job lock. Please check this thread https://groups.google.com/forum/#!topic/gobblin-users/_bTxGa3wZaI We started developing the solution last year when GAAS was not release so we landed up building our own custom modules which works fine but started troubling us during the load test which we figured last week. Please check this one https://groups.google.com/forum/#!searchin/gobblin-users/vicky$20kak%7Csort:date/gobblin-users/kHrWh6lfGJM/iPBQSEcdBQAJ >>Also, the state store and state are separate things. You can still pass state from the source to the extractor without the state store enabled. The state store is used to transfer state across >>executions, like cases where a watermark is used to resume an incremental pull. This is a relief, let me try it out and see what happens. I think this is what is required for our use case. Thanks, Vicky On Sun, Nov 12, 2017 at 3:32 AM, Hung Tran <[email protected]> wrote: > Hi Vicky, > > > What's the underlying file system you are using? > > > Have you checked to make sure there are not multiple instances of the job > running concurrently or make use of the job lock? > > > Also, the state store and state are separate things. You can still pass > state from the source to the extractor without the state store enabled. The > state store is used to transfer state across executions, like cases where a > watermark is used to resume an incremental pull. > > > Hung. > ------------------------------ > *From:* Vicky Kak <[email protected]> > *Sent:* Saturday, November 11, 2017 5:58:59 AM > *To:* [email protected]; [email protected] > *Subject:* Corrupted state file when Jobs are being run in parallel. > > Hi Guys, > > I have been running the stress tests and am seeing the following errors > > Error 1 > ************************************************************ > ********************************************************************* > 017-11-11 11:20:56 UTC INFO [pool-11-thread-421] > org.apache.hadoop.fs.FSInputChecker 284 - Found checksum error: b[0, > 512]=53455106196f72672e6170616368652e6861646f6f702e696f2e54657874 > 25676f62626c696e2e72756e74696d652e4a6f6253746174652444617461 > 736574537461746501012a6f72672e6170616368652e6861646f6f702e69 > 6f2e636f6d70726573732e44656661756c74436f6465630000000044e218 > b9e6aad3f1aa97f2210fb5c7f0ffffffff44e218b9e6aad3f1aa97f2210f > b5c7f00109789c6304000002000209789c630000000100010b789cebb3d5 > 0200025100f68e0ab4789ced5b7b73dbc611971c3b8d5ff233b6d324ad86 > 1337e9d804013e4451692643d1924c51a26489962da71ece013890270238 > f870904479fc1592ffdb4fd1e9f4b364a6dfa3ff770f0fbe244384eca6c9 > 98d2f081bbddc5deede2f6777bcbcf974da275d8266ae1a543ce90c629db > f44cb3a9e48ae93daa3663fa9b4a419195ac5c2a147373a5a9a9e9e6df4a > df3c0a3ee7ff39e5ff5da7172b1bebebd54663097aa6a6c52b995c9923b7 > 333e79530e15f93954e41f8122d7fe0d6f8f6fe0805bb291855d0769f8ae > e14b961c102da17d4625576b630b5d7ae561d6954c64b7ce75d817420986 > 39b4f036c348772835250b1dbae4084f672fba1c1a2d89e85f159031870d > 944fe7545d4be70b46313d5f9071ba24e772459445322aea331479bc2df9 > 6f1e33bf6d73eeb80b998c4d74506c1f3349a356c627ca4a72467c520637fa9e > org.apache.hadoop.fs.ChecksumException: Checksum error: > file:/home/Installable/gobblin-dist/working-dir/state-store/ > FlickrPageExtractorPull_137/current.jst > at 0 exp: 36820587 got: 91149211 > at > org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:322) > at > org.apache.hadoop.fs.FSInputChecker.readChecksumChunk( > FSInputChecker.java:278) > at org.apache.hadoop.fs.FSInputChecker.fill( > FSInputChecker.java:213) > at > org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:231) > at org.apache.hadoop.fs.FSInputChecker.read( > FSInputChecker.java:195) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at java.io.DataInputStream.readFully(DataInputStream.java:169) > at > org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1845) > at > org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java: > 1810) > at > org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759) > at > org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773) > at > gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119) > at > gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns( > FsDatasetStateStore.java:173) > at gobblin.runtime.JobContext.<init>(JobContext.java:136) > at > gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131) > at > gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62) > at > gobblin.runtime.JobLauncherFactory.newJobLauncher( > JobLauncherFactory.java:80) > at > gobblin.runtime.JobLauncherFactory.newJobLauncher( > JobLauncherFactory.java:59) > at com.bph.JobLauncherResource.search(JobLauncherResource. > java:107) > at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > com.linkedin.restli.internal.server.RestLiMethodInvoker. > doInvoke(RestLiMethodInvoker.java:186) > at > com.linkedin.restli.internal.server.RestLiMethodInvoker. > invoke(RestLiMethodInvoker.java:141) > at > com.linkedin.restli.server.RestLiServer.handleResourceRequest( > RestLiServer.java:286) > at > com.linkedin.restli.server.RestLiServer.doHandleRequest( > RestLiServer.java:167) > at > com.linkedin.restli.server.BaseRestServer.handleRequest( > BaseRestServer.java:56) > at > com.linkedin.restli.server.DelegatingTransportDispatcher. > handleRestRequest(DelegatingTransportDispatcher.java:56) > at > com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest( > DispatcherRequestFilter.java:81) > at > com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter. > onRequest(FilterChainImpl.java:328) > at > com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) > at > com.linkedin.r2.filter.FilterChainIterator.onRequest( > FilterChainIterator.java:50) > at > com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest( > ServerCompressionFilter.java:126) > at > com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter. > onRequest(FilterChainImpl.java:328) > at > com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) > at > com.linkedin.r2.filter.FilterChainIterator.onRequest( > FilterChainIterator.java:50) > at > com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59) > at > com.linkedin.r2.filter.FilterChainIterator.onRequest( > FilterChainIterator.java:50) > at > com.linkedin.r2.filter.FilterChainImpl.onRestRequest( > FilterChainImpl.java:103) > at > com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest( > FilterChainDispatcher.java:74) > at > com.linkedin.r2.transport.http.server.HttpDispatcher. > handleRequest(HttpDispatcher.java:95) > at > com.linkedin.r2.transport.http.server.HttpDispatcher. > handleRequest(HttpDispatcher.java:62) > at > com.linkedin.r2.transport.http.server.HttpNettyServer$ > Handler.messageReceived(HttpNettyServer.java:171) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream( > SimpleChannelUpstreamHandler.java:80) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream( > DefaultChannelPipeline.java:545) > at > org.jboss.netty.channel.DefaultChannelPipeline$ > DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754) > at > org.jboss.netty.handler.execution.ChannelEventRunnable.run( > ChannelEventRunnable.java:69) > at > org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolEx > ecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2017-11-11 11:20:56 UTC ERROR [pool-11-thread-421] > com.bph.JobLauncherResource 110 - Job Id fk_137 failed while searching > key beryls Failed to create job launcher: > org.apache.hadoop.fs.ChecksumException: Checksum error: > file:/home/Installable/gobblin-dist/working-dir/state-store/ > FlickrPageExtractorPull_137/current.jst > at 0 exp: 36820587 got: 91149211 > 2017-11-11 11:20:56 UTC INFO [pool-11-thread-402] > gobblin.util.ExecutorsUtils 125 - Attempting to shutdown ExecutorService: > java.util.concurrent.ThreadPoolExecutor@6bce96a5[Shutting down, pool size > = > 1, active threads = 0, queued tasks = 0, completed tasks = 1] > 2017-11-11 11:20:56 UTC INFO [pool-11-thread-402] > gobblin.util.ExecutorsUtils 144 - Successfully shutdown ExecutorService: > java.util.concurrent.ThreadPoolExecutor@6bce96a5[Terminated, pool size = > 0, > active threads = 0, queued tasks = 0, completed tasks = 1] > > ************************************************************ > ********************************************************************* > > Error 2: > ************************************************************ > ********************************************************************* > > 2017-11-10 10:24:10 UTC WARN [pool-11-thread-13] > org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker 154 - > Problem opening checksum file: > file:/home/Installable/gobblin-dist/working-dir/state-store/ > YoutubePageExtractorPull_138/current.jst. > Ignoring exception: > java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readFully(DataInputStream.java:169) > at > org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>( > ChecksumFileSystem.java:146) > at > org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339) > at > org.apache.hadoop.io.SequenceFile$Reader.openFile(SequenceFile.java:1832) > at > org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1752) > at > org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773) > at > gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119) > at > gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns( > FsDatasetStateStore.java:173) > at gobblin.runtime.JobContext.<init>(JobContext.java:136) > at > gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131) > at > gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62) > at > gobblin.runtime.JobLauncherFactory.newJobLauncher( > JobLauncherFactory.java:80) > at > gobblin.runtime.JobLauncherFactory.newJobLauncher( > JobLauncherFactory.java:59) > at com.bph.JobLauncherResource.search(JobLauncherResource. > java:107) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: > 62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > com.linkedin.restli.internal.server.RestLiMethodInvoker. > doInvoke(RestLiMethodInvoker.java:186) > at > com.linkedin.restli.internal.server.RestLiMethodInvoker. > invoke(RestLiMethodInvoker.java:141) > at > com.linkedin.restli.server.RestLiServer.handleResourceRequest( > RestLiServer.java:286) > at > com.linkedin.restli.server.RestLiServer.doHandleRequest( > RestLiServer.java:167) > at > com.linkedin.restli.server.BaseRestServer.handleRequest( > BaseRestServer.java:56) > at > com.linkedin.restli.server.DelegatingTransportDispatcher. > handleRestRequest(DelegatingTransportDispatcher.java:56) > at > com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest( > DispatcherRequestFilter.java:81) > at > com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter. > onRequest(FilterChainImpl.java:328) > at > com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) > at > com.linkedin.r2.filter.FilterChainIterator.onRequest( > FilterChainIterator.java:50) > at > com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest( > ServerCompressionFilter.java:126) > at > com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter. > onRequest(FilterChainImpl.java:328) > at > com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) > at > com.linkedin.r2.filter.FilterChainIterator.onRequest( > FilterChainIterator.java:50) > at > com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59) > at > com.linkedin.r2.filter.FilterChainIterator.onRequest( > FilterChainIterator.java:50) > at > com.linkedin.r2.filter.FilterChainImpl.onRestRequest( > FilterChainImpl.java:103) > at > com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest( > FilterChainDispatcher.java:74) > at > com.linkedin.r2.transport.http.server.HttpDispatcher. > handleRequest(HttpDispatcher.java:95) > at > com.linkedin.r2.transport.http.server.HttpDispatcher. > handleRequest(HttpDispatcher.java:62) > at > com.linkedin.r2.transport.http.server.HttpNettyServer$ > Handler.messageReceived(HttpNettyServer.java:171) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream( > SimpleChannelUpstreamHandler.java:80) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream( > DefaultChannelPipeline.java:545) > at > org.jboss.netty.channel.DefaultChannelPipeline$ > DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754) > at > org.jboss.netty.handler.execution.ChannelEventRunnable.run( > ChannelEventRunnable.java:69) > at > org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolEx > ecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2017-11-10 10:24:11 UTC ERROR [pool-11-thread-13] > com.bph.JobLauncherResource 110 - Job Id yt_138 failed while searching > key ostfold Failed to create job launcher: java.io.EOFException > > ************************************************************ > ********************************************************************* > > Error 3 > ************************************************************ > ********************************************************************* > 2017-11-10 13:38:49 UTC ERROR [Commit-thread-0] > gobblin.runtime.SafeDatasetCommit 118 - Failed to persist dataset state > for dataset of job job_TwitterPageExtractorPull_135_1510321111647 > java.io.FileNotFoundException: Failed to rename > /home/Installable/gobblin-dist/working-dir/state-store/ > TwitterPageExtractorPull_135/_tmp_/current.jst > to > /home/Installable/gobblin-dist/working-dir/state-store/ > TwitterPageExtractorPull_135/current.jst: > src not found > at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:173) > at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:164) > at gobblin.util.HadoopUtils.copyFile(HadoopUtils.java:333) > at gobblin.metastore.FsStateStore.createAlias(FsStateStore.java:283) > at > gobblin.runtime.FsDatasetStateStore.persistDatasetState( > FsDatasetStateStore.java:221) > at > gobblin.runtime.SafeDatasetCommit.persistDatasetState( > SafeDatasetCommit.java:255) > at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:115) > at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:43) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > ************************************************************ > ********************************************************************* > > There errors are seeing during the stress tests for the same Jobs. For our > use case we can't afford the jobs to fail due to system issue like above. I > did some basic investigation and could find the issue could be happening to > to non atomic operations on the state file which is of extension .jst. It > seems we could disable the statestore, I looked at the following code in > gobblin.runtime.JobContext::createStateStore > ************************************************************ > ********************************************************************* > if (jobProps.containsKey(ConfigurationKeys.STATE_STORE_ENABLED) && > > !Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.STATE_STORE_ > ENABLED))) > { > return new NoopDatasetStateStore(stateStoreFs, stateStoreRootDir); > } else { > return new FsDatasetStateStore(stateStoreFs, stateStoreRootDir); > } > ************************************************************ > ********************************************************************* > > It seems that by disabling the statestore we may get over this issue, but > for our case the source implementation is passing the information to the > Extractor via the WorkUnitStoreState. > > > We don't want the Job Retry features and hence did disable it as explained > here > https://gobblin.readthedocs.io/en/latest/user-guide/ > Configuration-Properties-Glossary/#retry-properties > > I was expecting the disabling happening by setting the follwing only > workunit.retry.enabled=false > we have to set this also > task.maxretries=0 > As we don't rely on retries would it not be good to have a flag what will > ignore the the following calls when we have have > workunit.retry.enabled=false > > 1) Reading the initial value from the store > 2) Commit the final state to the store. > > As mentioned about we can't disable the state store as we are generating > some data in the Source implementation and passed to the corresponding > Extractor implementation via State. > > I do anticipate of having these issues in GAAS too. > > I will be working to fix this issue as this is a critical issue for us. > > Thanks, > Vicky >
