[ 
https://issues.apache.org/jira/browse/GOBBLIN-14?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Tiwari reassigned GOBBLIN-14:
--------------------------------------

    Assignee: Hung Tran

> On yarn mode,  from the beginning the seconde batch , helixTasks state 
> allways IN_PROGRESS
> ------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-14
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-14
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: mazhiyong
>            Assignee: Hung Tran
>
> I user gobblin 0.10.0 version.
> I have a application which read the kafka data write to hdfs , when i running 
> Gobblin on yarn,the helixTasks allways in IN_PROGRESS state outside first 
> batch can successfully write to hdfs.
> But data record just several thousands per 10min in kafka.
>  is whether a bug?
>  
> *Github Url* : https://github.com/linkedin/gobblin/issues/1926 
> *Github Reporter* : [~phenixmzy] 
> *Github Created At* : 2017-06-06T10:03:12Z 
> *Github Updated At* : 2017-06-10T14:10:25Z 
> h3. Comments 
> ----
> [~phenixmzy] wrote on 2017-06-06T10:05:15Z : ## this is my config files
> ### **Config File - conf/yarn/reference.conf**
> gobblin.home.dir=/opt/gobblin-dist
> gobblin.yarn.app.queue=default
> gobblin.yarn.app.name=GobblinYarn
> gobblin.yarn.app.master.memory.mbs=512
> gobblin.yarn.app.master.cores=1
> gobblin.yarn.app.report.interval.minutes=5
> gobblin.yarn.max.get.app.report.failures=4
> gobblin.yarn.email.notification.on.shutdown=false
> gobblin.yarn.initial.containers=1
> gobblin.yarn.container.memory.mbs=512
> gobblin.yarn.container.cores=1
> gobblin.yarn.container.affinity.enabled=true
> gobblin.yarn.helix.instance.max.retries=2
> gobblin.yarn.keytab.login.interval.minutes=1440
> gobblin.yarn.token.renew.interval.minutes=720
> gobblin.yarn.work.dir=/gobblin
> zookeeper.connection.string=host-xxx-32-201.meizu.com:2181,host-xxx-32-202.meizu.com:2181,host-xxx-32-203.meizu.com:2181
> zookeeper.session.timeout.seconds=180
> zookeeper.connection.timeout.seconds=30
> zookeeper.retry.backoff.seconds=1
> zookeeper.retry.count.max=10
> gobblin.cluster.helix.cluster.name=GobblinYarn
> gobblin.cluster.zk.connection.string=${zookeeper.connection.string}
> fs.uri=hdfs://xxxcluster
> job.execinfo.server.enabled=false
>  
> ### **Config File - conf/yarn/application.conf**
> gobblin.yarn.app.name=GobblinYarnTest
> gobblin.yarn.app.master.memory.mbs=256
> gobblin.yarn.app.master.files.local=${gobblin.yarn.conf.dir}/log4j-yarn.properties,${gobblin.yarn.conf.dir}/application.conf,${gobblin.yarn.conf.dir}/reference.conf
> gobblin.yarn.initial.containers=2
> gobblin.yarn.container.memory.mbs=512
> gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local}
> gobblin.yarn.container.affinity.enabled=true
> gobblin.yarn.helix.instance.max.retries=2
> gobblin.yarn.conf.dir=${gobblin.home.dir}/conf/yarn
> gobblin.yarn.lib.jars.dir=${gobblin.home.dir}/lib
> gobblin.yarn.logs.sink.root.dir=${gobblin.home.dir}/applogs
> gobblin.cluster.helix.cluster.name=GobblinYarnTest
> gobblin.cluster.job.conf.path=${gobblin.home.dir}/jobconf
> writer.fs.uri=${fs.uri}
> state.store.fs.uri=${fs.uri}
> writer.destination.type=HDFS
> writer.output.format=AVRO
> writer.staging.dir=${gobblin.yarn.work.dir}/task-staging
> writer.output.dir=${gobblin.yarn.work.dir}/task-output
> data.publisher.type=gobblin.publisher.BaseDataPublisher
> data.publisher.final.dir=${gobblin.yarn.work.dir}/job-output
> data.publisher.replace.final.dir=false
> state.store.dir=${gobblin.yarn.work.dir}/state-store
> qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
> job.lock.enabled=true
> job.lock.type=gobblin.runtime.locks.ZookeeperBasedJobLock
> job.lock.dir=${gobblin.yarn.work.dir}/locks
> metrics.log.dir=${gobblin.yarn.work.dir}/metrics
> metrics.enabled=true
> ### **Config File - kafka-hdfs-on-yarn.pull**
> fs.uri=hdfs://xxxcluster
> mr.job.max.mappers=10
> mr.job.root.dir=${gobblin.yarn.work.dir}/working
> topic.whitelist=test
> kafka.brokers=host-xxx-32-201:9092,host-xxx-32-202:9092,host-xxx-32-203:9092
> bootstrap.with.offset=earliest
> job.name=GobblinKafkaQuickStart
> job.group=GobblinKafka
> job.description=Gobblin quick start job for Kafka
> job.lock.enabled=false
> job.schedule=0 0/3 * * * ?
> source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
> extract.namespace=gobblin.extract.kafka
> writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
> writer.file.path.type=tablename
> writer.destination.type=HDFS
> writer.output.format=txt
> writer.fs.uri=${fs.uri}
> metrics.reporting.file.enabled=true
> metrics.log.dir=${gobblin.yarn.work.dir}/metrics
> metrics.reporting.file.suffix=txt
> data.publisher.type=gobblin.publisher.BaseDataPublisher
> data.publisher.final.dir=/gobblintest/job-output
> data.publisher.replace.final.dir=false
> state.store.fs.uri=${fs.uri}
> state.store.dir=${gobblin.yarn.work.dir}/state-store
> task.data.root.dir=${gobblin.yarn.work.dir}/task-data 
>  
> *Github Url* : 
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-306440651 
> ----
> [~phenixmzy] wrote on 2017-06-06T10:09:48Z : ## The Log
> 2017-06-06 17:18:47 CST INFO  [SchedulerService STARTING] 
> org.quartz.core.QuartzScheduler  - Scheduler meta-data: Quartz Scheduler 
> (v2.2.3) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
>   Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
>   NOT STARTED.
>   Currently in standby mode.
>   Number of jobs executed: 0
>   Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
>   Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support 
> persistence. and is not clustered.
> 2017-06-06 17:18:47 CST INFO  [SchedulerService STARTING] 
> org.quartz.impl.StdSchedulerFactory  - Quartz scheduler 
> 'DefaultQuartzScheduler' initialized from default resource file in Quartz 
> package: 'quartz.properties'
> 2017-06-06 17:18:47 CST INFO  [SchedulerService STARTING] 
> org.quartz.impl.StdSchedulerFactory  - Quartz scheduler version: 2.2.3
> 2017-06-06 17:18:47 CST INFO  [SchedulerService STARTING] 
> org.quartz.core.QuartzScheduler  - Scheduler 
> DefaultQuartzScheduler_$_NON_CLUSTERED started.
> 2017-06-06 17:18:47 CST INFO  [YarnService STARTING] 
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing 
> over to rm2
> 2017-06-06 17:18:47 CST INFO  [JobConfigurationManager STARTING] 
> gobblin.cluster.JobConfigurationManager  - Loaded 1 job configuration(s)
> 2017-06-06 17:18:47 CST INFO  [JobConfigurationManager STARTING] 
> gobblin.cluster.JobConfigurationManager  - Posting new JobConfig with name: 
> GobblinKafkaQuickStart and config: {extract.namespace=gobblin.extract.kafka, 
> ......
> 2017-06-06 17:18:47 CST INFO  [JobConfigurationManager STARTING] 
> gobblin.cluster.GobblinHelixJobScheduler  - Received new job configuration of 
> job GobblinKafkaQuickStart
> 2017-06-06 17:18:47 CST INFO  [JobConfigurationManager STARTING] 
> gobblin.cluster.GobblinHelixJobScheduler  - Scheduling job 
> GobblinKafkaQuickStart
> 2017-06-06 17:18:47 CST INFO  [JobConfigurationManager STARTING] 
> gobblin.scheduler.JobScheduler  - Scheduled job 
> GobblinKafka.GobblinKafkaQuickStart. Next run: Tue Jun 06 17:21:00 CST 2017.
> 2017-06-06 17:18:47 CST INFO  [YarnService STARTING] gobblin.yarn.YarnService 
>  - ApplicationMaster registration response: maximumCapability { memory: 5120 
> virtual_cores: 6 } queue: default scheduler_resource_types: MEMORY
> 2017-06-06 17:18:47 CST INFO  [YarnService STARTING] gobblin.yarn.YarnService 
>  - Requesting initial containers
> 2017-06-06 17:18:49 CST INFO  [AMRM Heartbeater thread] 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Received new token 
> for : mz-bigdata-32-227.meizu.com:45454
> 2017-06-06 17:18:49 CST INFO  [AMRM Heartbeater thread] 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Received new token 
> for : mz-bigdata-32-226.meizu.com:45454
> 2017-06-06 17:18:49 CST INFO  [AMRM Callback Handler Thread] 
> gobblin.yarn.YarnService$AMRMClientCallbackHandler  - Container 
> container_e46_1496740629024_0001_01_000002 has been allocated
> 2017-06-06 17:18:49 CST INFO  [AMRM Callback Handler Thread] 
> gobblin.yarn.YarnService$AMRMClientCallbackHandler  - Container 
> container_e46_1496740629024_0001_01_000003 has been allocated
> 2017-06-06 17:18:49 CST INFO  [ContainerLaunchExecutor] 
> gobblin.yarn.YarnService$AMRMClientCallbackHandler$1  - Starting container 
> container_e46_1496740629024_0001_01_000002
> 2017-06-06 17:18:49 CST INFO  [ContainerLaunchExecutor] 
> gobblin.yarn.YarnService$AMRMClientCallbackHandler$1  - Starting container 
> container_e46_1496740629024_0001_01_000003
> 2017-06-06 17:18:50 CST WARN  [ContainerLaunchExecutor] 
> gobblin.yarn.YarnService  - Path 
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/container/_appjars
>  does not exist so no container LocalResource to add
> 2017-06-06 17:18:50 CST WARN  [ContainerLaunchExecutor] 
> gobblin.yarn.YarnService  - Path 
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/container/_appjars
>  does not exist so no container LocalResource to add
> 2017-06-06 17:18:50 CST INFO  
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #0] 
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor
>   - Processing Event EventType: START_CONTAINER for Container 
> container_e46_1496740629024_0001_01_000003
> 2017-06-06 17:18:50 CST INFO  
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1] 
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor
>   - Processing Event EventType: START_CONTAINER for Container 
> container_e46_1496740629024_0001_01_000002
> 2017-06-06 17:18:50 CST INFO  
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1] 
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy$ContainerManagementProtocolProxyData
>   - Opening proxy : mz-bigdata-32-227.meizu.com:45454
> 2017-06-06 17:18:50 CST INFO  
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #0] 
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy$ContainerManagementProtocolProxyData
>   - Opening proxy : mz-bigdata-32-226.meizu.com:45454
> 2017-06-06 17:18:50 CST INFO  
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1] 
> gobblin.yarn.YarnService$NMClientCallbackHandler  - Container 
> container_e46_1496740629024_0001_01_000002 has been started
> 2017-06-06 17:18:50 CST INFO  
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #0] 
> gobblin.yarn.YarnService$NMClientCallbackHandler  - Container 
> container_e46_1496740629024_0001_01_000003 has been started
> 2017-06-06 17:19:05 CST INFO  
> [ZkClient-EventThread-25-mz-bigdata-32-201.meizu.com:2181,mz-bigdata-32-202.meizu.com:2181,mz-bigdata-32-203.meizu.com:2181]
>  gobblin.cluster.GobblinClusterManager$GobblinLiveInstanceChangeListener  - 
> Live Helix participant instance: GobblinYarnTaskRunner_1
> 2017-06-06 17:19:07 CST INFO  
> [ZkClient-EventThread-25-mz-bigdata-32-201.meizu.com:2181,mz-bigdata-32-202.meizu.com:2181,mz-bigdata-32-203.meizu.com:2181]
>  gobblin.cluster.GobblinClusterManager$GobblinLiveInstanceChangeListener  - 
> Live Helix participant instance: GobblinYarnTaskRunner_1
> 2017-06-06 17:19:07 CST INFO  
> [ZkClient-EventThread-25-mz-bigdata-32-201.meizu.com:2181,mz-bigdata-32-202.meizu.com:2181,mz-bigdata-32-203.meizu.com:2181]
>  gobblin.cluster.GobblinClusterManager$GobblinLiveInstanceChangeListener  - 
> Live Helix participant instance: GobblinYarnTaskRunner_2
> 2017-06-06 17:21:00 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ClustersNames  - no default cluster mapping found
> 2017-06-06 17:21:00 CST WARN  [DefaultQuartzScheduler_Worker-1] 
> gobblin.runtime.AbstractJobLauncher  - Creating a job specific 
> SharedResourcesBroker. Objects will only be shared at the job level.
> 2017-06-06 17:21:00 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> org.reflections.Reflections  - Reflections took 657 ms to scan 43 urls, 
> producing 645 keys and 2315 values 
> 2017-06-06 17:21:00 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> org.apache.hadoop.conf.Configuration  - user.name is deprecated. Instead, use 
> mapreduce.job.user.name
> 2017-06-06 17:21:00 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.reflection.GobblinConstructorUtils  - Found accessible 
> constructor for class class gobblin.runtime.FsDatasetStateStore with 
> parameter types [class org.apache.hadoop.hdfs.DistributedFileSystem, class 
> java.lang.String, class java.lang.Integer, class 
> com.google.common.cache.LocalCache$LocalLoadingCache].
> 2017-06-06 17:21:00 CST INFO  [GetFsDatasetStateStore-] 
> gobblin.runtime.FsDatasetStateStore$3$1  - Getting dataset states from: 
> hdfs://mzcluster/user/yarn/${gobblin.yarn.work.dir}/state-store/GobblinKafkaQuickStart/current.jst
> 2017-06-06 17:21:00 CST INFO  [GetFsDatasetStateStore-] 
> org.apache.hadoop.io.compress.zlib.ZlibFactory  - Successfully loaded & 
> initialized native-zlib library
> 2017-06-06 17:21:00 CST INFO  [GetFsDatasetStateStore-] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new decompressor 
> [.deflate]
> 2017-06-06 17:21:00 CST INFO  [GetFsDatasetStateStore-] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new decompressor 
> [.deflate]
> 2017-06-06 17:21:00 CST INFO  [GetFsDatasetStateStore-] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new decompressor 
> [.deflate]
> 2017-06-06 17:21:00 CST INFO  [GetFsDatasetStateStore-] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new decompressor 
> [.deflate]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@4c9bb974
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@4c9bb974
> 2017-06-06 17:21:01 CST WARN  [DefaultQuartzScheduler_Worker-1] 
> gobblin.runtime.JobContext  - Property writer.staging.dir is deprecated. No 
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:21:01 CST WARN  [DefaultQuartzScheduler_Worker-1] 
> gobblin.runtime.JobContext  - Property writer.output.dir is deprecated. No 
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@3c553a00[Shutting down, pool size = 
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@3c553a00[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.kafka.client.Kafka08ConsumerClient  - Fetching topic metadata from 
> broker mz-bigdata-32-201.meizu.com:9092
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Discovered topic test
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@74b1f437[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0]
> 2017-06-06 17:21:01 CST INFO  [pool-29-thread-1] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Created workunit for 
> partition test:0: lowWatermark=659986, highWatermark=663501, range=3515
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@74b1f437[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Created workunits for 1 
> topics in 0 seconds
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
>   - Estimated avg time to pull a record for topic test is 0.146157 
> milliseconds
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
>   - For all topics not pulled in the previous run, estimated avg time to pull 
> a record is 0.1461565963532356 milliseconds
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 0: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 1: estimated load=154.651281, partitions=[[test:0]]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 2: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 3: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 4: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 5: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 6: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 7: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 8: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 9: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 154.651281; 
> Diff = 99.998053%
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@1311b3f7
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@1311b3f7
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.runtime.AbstractJobLauncher  - Starting job 
> job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@53580f39[Shutting down, pool size = 
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@53580f39[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - runWorkUnits 
> 2017-06-06 17:21:02 CST INFO  [TaskStateCollectorService STARTING] 
> gobblin.runtime.TaskStateCollectorService  - Starting the 
> TaskStateCollectorService
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [ParallelRunner] 
> org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3b33222d
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3b33222d
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> org.apache.helix.task.TaskDriver  - Starting workflow GobblinKafkaQuickStart
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - Created job queue 
> GobblinKafkaQuickStart
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> org.apache.helix.task.TaskDriver  - Add job configuration 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> org.apache.helix.task.TaskUtil  - invoke rebalance for GobblinKafkaQuickStart
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - Submitted job 
> job_GobblinKafkaQuickStart_1496740860051 to Helix
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion...
> 2017-06-06 17:21:02 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - workflowContext is null.
> 2017-06-06 17:21:02 CST INFO  [Thread-10] 
> org.apache.helix.task.WorkflowRebalancer  - Job 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051 idealstate 
> already exists!
> 2017-06-06 17:21:03 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion - 
> helixJobState IN_PROGRESS 
> 2017-06-06 17:21:04 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion - 
> helixJobState IN_PROGRESS 
> 2017-06-06 17:21:05 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion - 
> helixJobState IN_PROGRESS 
> 2017-06-06 17:21:06 CST INFO  [Thread-10] 
> org.apache.helix.task.TaskRebalancer  - Cleaning up idealstate and 
> externalView for job: 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:06 CST INFO  [Thread-10] 
> org.apache.helix.task.TaskRebalancer  - Successfully clean up 
> idealstate/externalView for resource 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051.
> 2017-06-06 17:21:06 CST INFO  [Thread-10] org.apache.helix.task.JobRebalancer 
>  - Workflow GobblinKafkaQuickStart or job 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051 is already 
> failed or completed, workflow state (IN_PROGRESS), job state (COMPLETED), 
> clean up job IS.
> 2017-06-06 17:21:06 CST INFO  [Thread-10] 
> org.apache.helix.task.TaskRebalancer  - Cleaning up idealstate and 
> externalView for job: 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:06 CST INFO  [Thread-10] 
> org.apache.helix.task.TaskRebalancer  - Successfully clean up 
> idealstate/externalView for resource 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051.
> 2017-06-06 17:21:06 CST WARN  [Thread-10] org.apache.helix.model.IdealState  
> - Resource 
> key:GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051_9 does 
> not have a pre-computed preference list.
> 2017-06-06 17:21:06 CST WARN  [Thread-10] org.apache.helix.model.IdealState  
> - Resource 
> key:GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051_4 does 
> not have a pre-computed preference list.
> 2017-06-06 17:21:06 CST WARN  [Thread-10] org.apache.helix.model.IdealState  
> - Resource 
> key:GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051_7 does 
> not have a pre-computed preference list.
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion 
> helixJobState COMPLETED 
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - Job 
> job_GobblinKafkaQuickStart_1496740860051 completed
> 2017-06-06 17:21:06 CST INFO  [TaskStateCollectorService STOPPING] 
> gobblin.runtime.TaskStateCollectorService  - Stopping the 
> TaskStateCollectorService
> 2017-06-06 17:21:06 CST INFO  [TaskStateCollectorService STOPPING] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@2aca135b
> 2017-06-06 17:21:06 CST INFO  [TaskStateCollectorService STOPPING] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@2aca135b
> 2017-06-06 17:21:06 CST INFO  [TaskStateCollectorService STOPPING] 
> gobblin.runtime.TaskStateCollectorService  - Collected task state of 1 
> completed tasks
> 2017-06-06 17:21:06 CST INFO  [TaskStateCollectorService STOPPING] 
> gobblin.runtime.JobContext  - 1 more tasks of job 
> job_GobblinKafkaQuickStart_1496740860051 have completed
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.cluster.GobblinHelixJobLauncher  - Deleting persisted work units for 
> job job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.runtime.JobContext  - Persisting dataset urns.
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.reflection.GobblinConstructorUtils  - Found accessible 
> constructor for class class 
> gobblin.metastore.nameParser.SimpleDatasetUrnStateStoreNameParser with 
> parameter types [].
> 2017-06-06 17:21:06 CST INFO  [Commit-thread-0] 
> gobblin.runtime.SafeDatasetCommit  - Committing dataset  of job 
> job_GobblinKafkaQuickStart_1496740860051 with commit policy 
> COMMIT_ON_FULL_SUCCESS and state SUCCESSFUL
> 2017-06-06 17:21:06 CST INFO  [Commit-thread-0] 
> gobblin.publisher.BaseDataPublisher  - Moving 
> hdfs://mzcluster/gobblin/task-output/test/part.task_GobblinKafkaQuickStart_1496740860051_0.txt
>  to 
> /gobblintest/job-output/test/part.task_GobblinKafkaQuickStart_1496740860051_0.txt
> 2017-06-06 17:21:06 CST INFO  [Commit-thread-0] gobblin.util.ExecutorsUtils  
> - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@197d7a98
> 2017-06-06 17:21:06 CST INFO  [Commit-thread-0] gobblin.util.ExecutorsUtils  
> - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@197d7a98
> 2017-06-06 17:21:06 CST INFO  [Commit-thread-0] 
> gobblin.runtime.SafeDatasetCommit  - Persisting dataset state for dataset 
> 2017-06-06 17:21:06 CST INFO  [Commit-thread-0] 
> gobblin.runtime.FsDatasetStateStore  - Persisting 
> job_GobblinKafkaQuickStart_1496740860051.jst to the job state store
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@43cfed92
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@43cfed92
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.JobLauncherUtils  - Cleaning up staging directory 
> /gobblin/task-staging/test
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.JobLauncherUtils  - Cleaning up output directory 
> /gobblin/task-output/test
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@144164c3
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@144164c3
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@25ca5791[Shutting down, pool size = 
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:06 CST INFO  [DefaultQuartzScheduler_Worker-1] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@25ca5791[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST WARN  [DefaultQuartzScheduler_Worker-2] 
> gobblin.runtime.AbstractJobLauncher  - Creating a job specific 
> SharedResourcesBroker. Objects will only be shared at the job level.
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.reflection.GobblinConstructorUtils  - Found accessible 
> constructor for class class gobblin.runtime.FsDatasetStateStore with 
> parameter types [class org.apache.hadoop.hdfs.DistributedFileSystem, class 
> java.lang.String, class java.lang.Integer, class 
> com.google.common.cache.LocalCache$LocalLoadingCache].
> 2017-06-06 17:24:00 CST INFO  [GetFsDatasetStateStore-] 
> gobblin.runtime.FsDatasetStateStore$3$1  - Getting dataset states from: 
> hdfs://mzcluster/user/yarn/${gobblin.yarn.work.dir}/state-store/GobblinKafkaQuickStart/current.jst
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@cfe774d
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@cfe774d
> 2017-06-06 17:24:00 CST WARN  [DefaultQuartzScheduler_Worker-2] 
> gobblin.runtime.JobContext  - Property writer.staging.dir is deprecated. No 
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:24:00 CST WARN  [DefaultQuartzScheduler_Worker-2] 
> gobblin.runtime.JobContext  - Property writer.output.dir is deprecated. No 
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:24:00 CST WARN  [DefaultQuartzScheduler_Worker-2] 
> gobblin.metrics.MetricContext$Builder  - MetricContext with specified name 
> already exists, appending UUID to the given name: 
> 5df4b7a6-c054-4f4b-8773-f535ad8bdf40
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@242e5a62[Terminated, pool size = 1, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@242e5a62[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.kafka.client.Kafka08ConsumerClient  - Fetching topic metadata from 
> broker mz-bigdata-32-201.meizu.com:9092
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Discovered topic test
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@50b7fe2f[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0]
> 2017-06-06 17:24:00 CST INFO  [pool-43-thread-1] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Created workunit for 
> partition test:0: lowWatermark=663501, highWatermark=664308, range=807
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@50b7fe2f[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Created workunits for 1 
> topics in 0 seconds
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
>   - Estimated avg time to pull a record for topic test is 0.482788 
> milliseconds
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
>   - For all topics not pulled in the previous run, estimated avg time to pull 
> a record is 0.48278805120910384 milliseconds
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 0: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 1: estimated load=117.284284, partitions=[[test:0]]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 2: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 3: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 4: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 5: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 6: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 7: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 8: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 9: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 117.284284; 
> Diff = 99.997433%
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@32180870
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@32180870
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.runtime.AbstractJobLauncher  - Starting job 
> job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@4a2e171a[Shutting down, pool size = 
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@4a2e171a[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - runWorkUnits 
> 2017-06-06 17:24:00 CST INFO  [TaskStateCollectorService STARTING] 
> gobblin.runtime.TaskStateCollectorService  - Starting the 
> TaskStateCollectorService
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@5ac858a
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@5ac858a
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - Job queue GobblinKafkaQuickStart 
> already exists
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> org.apache.helix.task.TaskDriver  - Add job configuration 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> org.apache.helix.task.TaskUtil  - invoke rebalance for GobblinKafkaQuickStart
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - Submitted job 
> job_GobblinKafkaQuickStart_1496740860051 to Helix
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion...
> 2017-06-06 17:24:00 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion - 
> helixJobState null 
> 2017-06-06 17:24:00 CST INFO  [Thread-10] 
> org.apache.helix.task.WorkflowRebalancer  - Job 
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051 idealstate 
> already exists!
> 2017-06-06 17:24:01 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion - 
> helixJobState IN_PROGRESS
> 2017-06-06 17:24:02 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion - 
> helixJobState IN_PROGRESS
> ..............................................
> ..............................................
> ..............................................
> 2017-06-06 17:26:59 CST INFO  [DefaultQuartzScheduler_Worker-2] 
> gobblin.cluster.GobblinHelixJobLauncher  - wait for job completion - 
> helixJobState IN_PROGRESS 
> 2017-06-06 17:27:00 CST WARN  [DefaultQuartzScheduler_Worker-3] 
> gobblin.runtime.AbstractJobLauncher  - Creating a job specific 
> SharedResourcesBroker. Objects will only be shared at the job level.
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.reflection.GobblinConstructorUtils  - Found accessible 
> constructor for class class gobblin.runtime.FsDatasetStateStore with 
> parameter types [class org.apache.hadoop.hdfs.DistributedFileSystem, class 
> java.lang.String, class java.lang.Integer, class 
> com.google.common.cache.LocalCache$LocalLoadingCache].
> 2017-06-06 17:27:00 CST INFO  [GetFsDatasetStateStore-] 
> gobblin.runtime.FsDatasetStateStore$3$1  - Getting dataset states from: 
> hdfs://mzcluster/user/yarn/${gobblin.yarn.work.dir}/state-store/GobblinKafkaQuickStart/current.jst
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@49cb4dbd
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@49cb4dbd
> 2017-06-06 17:27:00 CST WARN  [DefaultQuartzScheduler_Worker-3] 
> gobblin.runtime.JobContext  - Property writer.staging.dir is deprecated. No 
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:27:00 CST WARN  [DefaultQuartzScheduler_Worker-3] 
> gobblin.runtime.JobContext  - Property writer.output.dir is deprecated. No 
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@18918ad6[Shutting down, pool size = 
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@18918ad6[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.kafka.client.Kafka08ConsumerClient  - Fetching topic metadata from 
> broker mz-bigdata-32-201.meizu.com:9092
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Discovered topic test
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@30f8679c[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0]
> 2017-06-06 17:27:00 CST INFO  [pool-51-thread-1] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Created workunit for 
> partition test:0: lowWatermark=663501, highWatermark=664471, range=970
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@30f8679c[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.KafkaSource  - Created workunits for 1 
> topics in 0 seconds
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
>   - Estimated avg time to pull a record for topic test is 0.482788 
> milliseconds
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
>   - For all topics not pulled in the previous run, estimated avg time to pull 
> a record is 0.48278805120910384 milliseconds
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 0: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 1: estimated load=140.973674, partitions=[[test:0]]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 2: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 3: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 4: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 5: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 6: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 7: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 8: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> MultiWorkUnit 9: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  - 
> Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 140.973674; 
> Diff = 99.997865%
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3c2db018
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3c2db018
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.runtime.AbstractJobLauncher  - Starting job 
> job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Attempting to shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@6cbc3980[Shutting down, pool size = 
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ExecutorsUtils  - Successfully shutdown ExecutorService: 
> java.util.concurrent.ThreadPoolExecutor@6cbc3980[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO  [DefaultQuartzScheduler_Worker-3] 
> gobblin.cluster.GobblinHelixJobLauncher  - runWorkUnits 
> 2017-06-06 17:27:00 CST INFO  [TaskStateCollectorService STARTING] 
> gobblin.runtime.TaskStateCollectorService  - Starting the 
> TaskStateCollectorService
> 2017-06-06 17:27:00 CST WARN  [TaskStateCollectorService RUNNING] 
> gobblin.runtime.TaskStateCollectorService  - No output task state files found 
> in 
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_taskstates/job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:27:00 CST WARN  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ParallelRunner  - Task failed: Serialize state to store 
> job_GobblinKafkaQuickStart_1496740860051 file 
> multitask_GobblinKafkaQuickStart_1496740860051_0.mwu
> org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename 
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/_tmp_multitask_GobblinKafkaQuickStart_1496740860051_0.mwu
>  to 
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/multitask_GobblinKafkaQuickStart_1496740860051_0.mwu:
>  dst already exists
>       at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:219)
>       at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:201)
>       at gobblin.metastore.FsStateStore.put(FsStateStore.java:176)
>       at 
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:325)
>       at 
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:322)
>       at 
> gobblin.util.executors.MDCPropagatingCallable.call(MDCPropagatingCallable.java:38)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> 2017-06-06 17:27:00 CST WARN  [DefaultQuartzScheduler_Worker-3] 
> gobblin.util.ParallelRunner  - Task failed: Serialize state to store 
> job_GobblinKafkaQuickStart_1496740860051 file 
> multitask_GobblinKafkaQuickStart_1496740860051_1.mwu
> org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename 
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/_tmp_multitask_GobblinKafkaQuickStart_1496740860051_1.mwu
>  to 
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/multitask_GobblinKafkaQuickStart_1496740860051_1.mwu:
>  dst already exists
>       at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:219)
>       at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:201)
>       at gobblin.metastore.FsStateStore.put(FsStateStore.java:176)
>       at 
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:325)
>       at 
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:322)
>       at 
> gobblin.util.executors.MDCPropagatingCallable.call(MDCPropagatingCallable.java:38)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> .............................................. 
>  
> *Github Url* : 
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-306441710 
> ----
> [~phenixmzy] wrote on 2017-06-06T10:17:36Z : the IN_PROGRESS state print log 
> ,i modify gobblin.cluster.GobblinHelixJobLauncher#waitForJobCompletion
> private void waitForJobCompletion() throws InterruptedException {
>     LOGGER.info(wait for job completion...);
>     while (true) {
>       WorkflowContext workflowContext = 
> TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
>       if (workflowContext != null) {
>         org.apache.helix.task.TaskState helixJobState = 
> workflowContext.getJobState(this.jobResourceName);
>         if (helixJobState == org.apache.helix.task.TaskState.COMPLETED ||
>             helixJobState == org.apache.helix.task.TaskState.FAILED ||
>             helixJobState == org.apache.helix.task.TaskState.STOPPED) {
>           LOGGER.info(String.format(wait for job completion helixJobState %s 
> , helixJobState));
>           return;
>         } else {
>           LOGGER.info(String.format(wait for job completion helixJobState %s 
> , helixJobState));
>         }
>       } else {
>         LOGGER.info(workflowContext is null);
>       }
>       Thread.sleep(1000);
>     }
>   } 
>  
> *Github Url* : 
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-306443458 
> ----
> [~phenixmzy] wrote on 2017-06-10T14:10:25Z : and builder base gobblin 0.10.0  
> and hadoop-2.6.5 :
> ./gradlew clean build -PhadoopVersion=2.6.5 -PhiveVersion=1.2.1 
> -Pversion=hadoop-2.6.5-gobblin-0.10.0 -x test --stacktrace 
>  
> *Github Url* : 
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-307567413



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to