Hi Vicky,
What's the underlying file system you are using? Have you checked to make sure there are not multiple instances of the job running concurrently or make use of the job lock? Also, the state store and state are separate things. You can still pass state from the source to the extractor without the state store enabled. The state store is used to transfer state across executions, like cases where a watermark is used to resume an incremental pull. Hung. ________________________________ From: Vicky Kak <[email protected]> Sent: Saturday, November 11, 2017 5:58:59 AM To: [email protected]; [email protected] Subject: Corrupted state file when Jobs are being run in parallel. Hi Guys, I have been running the stress tests and am seeing the following errors Error 1 ********************************************************************************************************************************* 017-11-11 11:20:56 UTC INFO [pool-11-thread-421] org.apache.hadoop.fs.FSInputChecker 284 - Found checksum error: b[0, 512]=53455106196f72672e6170616368652e6861646f6f702e696f2e5465787425676f62626c696e2e72756e74696d652e4a6f6253746174652444617461736574537461746501012a6f72672e6170616368652e6861646f6f702e696f2e636f6d70726573732e44656661756c74436f6465630000000044e218b9e6aad3f1aa97f2210fb5c7f0ffffffff44e218b9e6aad3f1aa97f2210fb5c7f00109789c6304000002000209789c630000000100010b789cebb3d50200025100f68e0ab4789ced5b7b73dbc611971c3b8d5ff233b6d324ad861337e9d804013e4451692643d1924c51a26489962da71ece013890270238f870904479fc1592ffdb4fd1e9f4b364a6dfa3ff770f0fbe244384eca6c998d2f081bbddc5deede2f6777bcbcf974da275d8266ae1a543ce90c629dbf44cb3a9e48ae93daa3663fa9b4a419195ac5c2a147373a5a9a9e9e6df4adf3c0a3ee7ff39e5ff5da7172b1bebebd54663097aa6a6c52b995c9923b7333e79530e15f93954e41f8122d7fe0d6f8f6fe0805bb291855d0769f8aee14b961c102da17d4625576b630b5d7ae561d6954c64b7ce75d81742098639b4f036c348772835250b1dbae4084f672fba1c1a2d89e85f159031870d944fe7545d4be70b46313d5f9071ba24e772459445322aea331479bc2df96f1e33bf6d73eeb80b998c4d74506c1f3349a356c627ca4a72467c520637fa9e org.apache.hadoop.fs.ChecksumException: Checksum error: file:/home/Installable/gobblin-dist/working-dir/state-store/FlickrPageExtractorPull_137/current.jst at 0 exp: 36820587 got: 91149211 at org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:322) at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:278) at org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:213) at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:231) at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1845) at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810) at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759) at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773) at gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119) at gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns(FsDatasetStateStore.java:173) at gobblin.runtime.JobContext.<init>(JobContext.java:136) at gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131) at gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62) at gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:80) at gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:59) at com.bph.JobLauncherResource.search(JobLauncherResource.java:107) at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:186) at com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:141) at com.linkedin.restli.server.RestLiServer.handleResourceRequest(RestLiServer.java:286) at com.linkedin.restli.server.RestLiServer.doHandleRequest(RestLiServer.java:167) at com.linkedin.restli.server.BaseRestServer.handleRequest(BaseRestServer.java:56) at com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:56) at com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:81) at com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328) at com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50) at com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest(ServerCompressionFilter.java:126) at com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328) at com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50) at com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59) at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50) at com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:103) at com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:74) at com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:95) at com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:62) at com.linkedin.r2.transport.http.server.HttpNettyServer$Handler.messageReceived(HttpNettyServer.java:171) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754) at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:69) at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-11-11 11:20:56 UTC ERROR [pool-11-thread-421] com.bph.JobLauncherResource 110 - Job Id fk_137 failed while searching key beryls Failed to create job launcher: org.apache.hadoop.fs.ChecksumException: Checksum error: file:/home/Installable/gobblin-dist/working-dir/state-store/FlickrPageExtractorPull_137/current.jst at 0 exp: 36820587 got: 91149211 2017-11-11 11:20:56 UTC INFO [pool-11-thread-402] gobblin.util.ExecutorsUtils 125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@6bce96a5[Shutting down, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] 2017-11-11 11:20:56 UTC INFO [pool-11-thread-402] gobblin.util.ExecutorsUtils 144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@6bce96a5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] ********************************************************************************************************************************* Error 2: ********************************************************************************************************************************* 2017-11-10 10:24:10 UTC WARN [pool-11-thread-13] org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker 154 - Problem opening checksum file: file:/home/Installable/gobblin-dist/working-dir/state-store/YoutubePageExtractorPull_138/current.jst. Ignoring exception: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339) at org.apache.hadoop.io.SequenceFile$Reader.openFile(SequenceFile.java:1832) at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1752) at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773) at gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119) at gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns(FsDatasetStateStore.java:173) at gobblin.runtime.JobContext.<init>(JobContext.java:136) at gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131) at gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62) at gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:80) at gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:59) at com.bph.JobLauncherResource.search(JobLauncherResource.java:107) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:186) at com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:141) at com.linkedin.restli.server.RestLiServer.handleResourceRequest(RestLiServer.java:286) at com.linkedin.restli.server.RestLiServer.doHandleRequest(RestLiServer.java:167) at com.linkedin.restli.server.BaseRestServer.handleRequest(BaseRestServer.java:56) at com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:56) at com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:81) at com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328) at com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50) at com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest(ServerCompressionFilter.java:126) at com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328) at com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55) at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50) at com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59) at com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50) at com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:103) at com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:74) at com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:95) at com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:62) at com.linkedin.r2.transport.http.server.HttpNettyServer$Handler.messageReceived(HttpNettyServer.java:171) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754) at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:69) at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-11-10 10:24:11 UTC ERROR [pool-11-thread-13] com.bph.JobLauncherResource 110 - Job Id yt_138 failed while searching key ostfold Failed to create job launcher: java.io.EOFException ********************************************************************************************************************************* Error 3 ********************************************************************************************************************************* 2017-11-10 13:38:49 UTC ERROR [Commit-thread-0] gobblin.runtime.SafeDatasetCommit 118 - Failed to persist dataset state for dataset of job job_TwitterPageExtractorPull_135_1510321111647 java.io.FileNotFoundException: Failed to rename /home/Installable/gobblin-dist/working-dir/state-store/TwitterPageExtractorPull_135/_tmp_/current.jst to /home/Installable/gobblin-dist/working-dir/state-store/TwitterPageExtractorPull_135/current.jst: src not found at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:173) at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:164) at gobblin.util.HadoopUtils.copyFile(HadoopUtils.java:333) at gobblin.metastore.FsStateStore.createAlias(FsStateStore.java:283) at gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:221) at gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:255) at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:115) at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:43) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ********************************************************************************************************************************* There errors are seeing during the stress tests for the same Jobs. For our use case we can't afford the jobs to fail due to system issue like above. I did some basic investigation and could find the issue could be happening to to non atomic operations on the state file which is of extension .jst. It seems we could disable the statestore, I looked at the following code in gobblin.runtime.JobContext::createStateStore ********************************************************************************************************************************* if (jobProps.containsKey(ConfigurationKeys.STATE_STORE_ENABLED) && !Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.STATE_STORE_ENABLED))) { return new NoopDatasetStateStore(stateStoreFs, stateStoreRootDir); } else { return new FsDatasetStateStore(stateStoreFs, stateStoreRootDir); } ********************************************************************************************************************************* It seems that by disabling the statestore we may get over this issue, but for our case the source implementation is passing the information to the Extractor via the WorkUnitStoreState. We don't want the Job Retry features and hence did disable it as explained here https://gobblin.readthedocs.io/en/latest/user-guide/Configuration-Properties-Glossary/#retry-properties I was expecting the disabling happening by setting the follwing only workunit.retry.enabled=false we have to set this also task.maxretries=0 As we don't rely on retries would it not be good to have a flag what will ignore the the following calls when we have have workunit.retry.enabled=false 1) Reading the initial value from the store 2) Commit the final state to the store. As mentioned about we can't disable the state store as we are generating some data in the Source implementation and passed to the corresponding Extractor implementation via State. I do anticipate of having these issues in GAAS too. I will be working to fix this issue as this is a critical issue for us. Thanks, Vicky
