[
https://issues.apache.org/jira/browse/GOBBLIN-14?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Abhishek Tiwari reassigned GOBBLIN-14:
--------------------------------------
Assignee: Hung Tran
> On yarn mode, from the beginning the seconde batch , helixTasks state
> allways IN_PROGRESS
> ------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-14
> URL: https://issues.apache.org/jira/browse/GOBBLIN-14
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: mazhiyong
> Assignee: Hung Tran
>
> I user gobblin 0.10.0 version.
> I have a application which read the kafka data write to hdfs , when i running
> Gobblin on yarn,the helixTasks allways in IN_PROGRESS state outside first
> batch can successfully write to hdfs.
> But data record just several thousands per 10min in kafka.
> is whether a bug?
>
> *Github Url* : https://github.com/linkedin/gobblin/issues/1926
> *Github Reporter* : [~phenixmzy]
> *Github Created At* : 2017-06-06T10:03:12Z
> *Github Updated At* : 2017-06-10T14:10:25Z
> h3. Comments
> ----
> [~phenixmzy] wrote on 2017-06-06T10:05:15Z : ## this is my config files
> ### **Config File - conf/yarn/reference.conf**
> gobblin.home.dir=/opt/gobblin-dist
> gobblin.yarn.app.queue=default
> gobblin.yarn.app.name=GobblinYarn
> gobblin.yarn.app.master.memory.mbs=512
> gobblin.yarn.app.master.cores=1
> gobblin.yarn.app.report.interval.minutes=5
> gobblin.yarn.max.get.app.report.failures=4
> gobblin.yarn.email.notification.on.shutdown=false
> gobblin.yarn.initial.containers=1
> gobblin.yarn.container.memory.mbs=512
> gobblin.yarn.container.cores=1
> gobblin.yarn.container.affinity.enabled=true
> gobblin.yarn.helix.instance.max.retries=2
> gobblin.yarn.keytab.login.interval.minutes=1440
> gobblin.yarn.token.renew.interval.minutes=720
> gobblin.yarn.work.dir=/gobblin
> zookeeper.connection.string=host-xxx-32-201.meizu.com:2181,host-xxx-32-202.meizu.com:2181,host-xxx-32-203.meizu.com:2181
> zookeeper.session.timeout.seconds=180
> zookeeper.connection.timeout.seconds=30
> zookeeper.retry.backoff.seconds=1
> zookeeper.retry.count.max=10
> gobblin.cluster.helix.cluster.name=GobblinYarn
> gobblin.cluster.zk.connection.string=${zookeeper.connection.string}
> fs.uri=hdfs://xxxcluster
> job.execinfo.server.enabled=false
>
> ### **Config File - conf/yarn/application.conf**
> gobblin.yarn.app.name=GobblinYarnTest
> gobblin.yarn.app.master.memory.mbs=256
> gobblin.yarn.app.master.files.local=${gobblin.yarn.conf.dir}/log4j-yarn.properties,${gobblin.yarn.conf.dir}/application.conf,${gobblin.yarn.conf.dir}/reference.conf
> gobblin.yarn.initial.containers=2
> gobblin.yarn.container.memory.mbs=512
> gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local}
> gobblin.yarn.container.affinity.enabled=true
> gobblin.yarn.helix.instance.max.retries=2
> gobblin.yarn.conf.dir=${gobblin.home.dir}/conf/yarn
> gobblin.yarn.lib.jars.dir=${gobblin.home.dir}/lib
> gobblin.yarn.logs.sink.root.dir=${gobblin.home.dir}/applogs
> gobblin.cluster.helix.cluster.name=GobblinYarnTest
> gobblin.cluster.job.conf.path=${gobblin.home.dir}/jobconf
> writer.fs.uri=${fs.uri}
> state.store.fs.uri=${fs.uri}
> writer.destination.type=HDFS
> writer.output.format=AVRO
> writer.staging.dir=${gobblin.yarn.work.dir}/task-staging
> writer.output.dir=${gobblin.yarn.work.dir}/task-output
> data.publisher.type=gobblin.publisher.BaseDataPublisher
> data.publisher.final.dir=${gobblin.yarn.work.dir}/job-output
> data.publisher.replace.final.dir=false
> state.store.dir=${gobblin.yarn.work.dir}/state-store
> qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
> job.lock.enabled=true
> job.lock.type=gobblin.runtime.locks.ZookeeperBasedJobLock
> job.lock.dir=${gobblin.yarn.work.dir}/locks
> metrics.log.dir=${gobblin.yarn.work.dir}/metrics
> metrics.enabled=true
> ### **Config File - kafka-hdfs-on-yarn.pull**
> fs.uri=hdfs://xxxcluster
> mr.job.max.mappers=10
> mr.job.root.dir=${gobblin.yarn.work.dir}/working
> topic.whitelist=test
> kafka.brokers=host-xxx-32-201:9092,host-xxx-32-202:9092,host-xxx-32-203:9092
> bootstrap.with.offset=earliest
> job.name=GobblinKafkaQuickStart
> job.group=GobblinKafka
> job.description=Gobblin quick start job for Kafka
> job.lock.enabled=false
> job.schedule=0 0/3 * * * ?
> source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
> extract.namespace=gobblin.extract.kafka
> writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
> writer.file.path.type=tablename
> writer.destination.type=HDFS
> writer.output.format=txt
> writer.fs.uri=${fs.uri}
> metrics.reporting.file.enabled=true
> metrics.log.dir=${gobblin.yarn.work.dir}/metrics
> metrics.reporting.file.suffix=txt
> data.publisher.type=gobblin.publisher.BaseDataPublisher
> data.publisher.final.dir=/gobblintest/job-output
> data.publisher.replace.final.dir=false
> state.store.fs.uri=${fs.uri}
> state.store.dir=${gobblin.yarn.work.dir}/state-store
> task.data.root.dir=${gobblin.yarn.work.dir}/task-data
>
> *Github Url* :
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-306440651
> ----
> [~phenixmzy] wrote on 2017-06-06T10:09:48Z : ## The Log
> 2017-06-06 17:18:47 CST INFO [SchedulerService STARTING]
> org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler
> (v2.2.3) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
> Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
> NOT STARTED.
> Currently in standby mode.
> Number of jobs executed: 0
> Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
> Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support
> persistence. and is not clustered.
> 2017-06-06 17:18:47 CST INFO [SchedulerService STARTING]
> org.quartz.impl.StdSchedulerFactory - Quartz scheduler
> 'DefaultQuartzScheduler' initialized from default resource file in Quartz
> package: 'quartz.properties'
> 2017-06-06 17:18:47 CST INFO [SchedulerService STARTING]
> org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.2.3
> 2017-06-06 17:18:47 CST INFO [SchedulerService STARTING]
> org.quartz.core.QuartzScheduler - Scheduler
> DefaultQuartzScheduler_$_NON_CLUSTERED started.
> 2017-06-06 17:18:47 CST INFO [YarnService STARTING]
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing
> over to rm2
> 2017-06-06 17:18:47 CST INFO [JobConfigurationManager STARTING]
> gobblin.cluster.JobConfigurationManager - Loaded 1 job configuration(s)
> 2017-06-06 17:18:47 CST INFO [JobConfigurationManager STARTING]
> gobblin.cluster.JobConfigurationManager - Posting new JobConfig with name:
> GobblinKafkaQuickStart and config: {extract.namespace=gobblin.extract.kafka,
> ......
> 2017-06-06 17:18:47 CST INFO [JobConfigurationManager STARTING]
> gobblin.cluster.GobblinHelixJobScheduler - Received new job configuration of
> job GobblinKafkaQuickStart
> 2017-06-06 17:18:47 CST INFO [JobConfigurationManager STARTING]
> gobblin.cluster.GobblinHelixJobScheduler - Scheduling job
> GobblinKafkaQuickStart
> 2017-06-06 17:18:47 CST INFO [JobConfigurationManager STARTING]
> gobblin.scheduler.JobScheduler - Scheduled job
> GobblinKafka.GobblinKafkaQuickStart. Next run: Tue Jun 06 17:21:00 CST 2017.
> 2017-06-06 17:18:47 CST INFO [YarnService STARTING] gobblin.yarn.YarnService
> - ApplicationMaster registration response: maximumCapability { memory: 5120
> virtual_cores: 6 } queue: default scheduler_resource_types: MEMORY
> 2017-06-06 17:18:47 CST INFO [YarnService STARTING] gobblin.yarn.YarnService
> - Requesting initial containers
> 2017-06-06 17:18:49 CST INFO [AMRM Heartbeater thread]
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Received new token
> for : mz-bigdata-32-227.meizu.com:45454
> 2017-06-06 17:18:49 CST INFO [AMRM Heartbeater thread]
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Received new token
> for : mz-bigdata-32-226.meizu.com:45454
> 2017-06-06 17:18:49 CST INFO [AMRM Callback Handler Thread]
> gobblin.yarn.YarnService$AMRMClientCallbackHandler - Container
> container_e46_1496740629024_0001_01_000002 has been allocated
> 2017-06-06 17:18:49 CST INFO [AMRM Callback Handler Thread]
> gobblin.yarn.YarnService$AMRMClientCallbackHandler - Container
> container_e46_1496740629024_0001_01_000003 has been allocated
> 2017-06-06 17:18:49 CST INFO [ContainerLaunchExecutor]
> gobblin.yarn.YarnService$AMRMClientCallbackHandler$1 - Starting container
> container_e46_1496740629024_0001_01_000002
> 2017-06-06 17:18:49 CST INFO [ContainerLaunchExecutor]
> gobblin.yarn.YarnService$AMRMClientCallbackHandler$1 - Starting container
> container_e46_1496740629024_0001_01_000003
> 2017-06-06 17:18:50 CST WARN [ContainerLaunchExecutor]
> gobblin.yarn.YarnService - Path
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/container/_appjars
> does not exist so no container LocalResource to add
> 2017-06-06 17:18:50 CST WARN [ContainerLaunchExecutor]
> gobblin.yarn.YarnService - Path
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/container/_appjars
> does not exist so no container LocalResource to add
> 2017-06-06 17:18:50 CST INFO
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #0]
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor
> - Processing Event EventType: START_CONTAINER for Container
> container_e46_1496740629024_0001_01_000003
> 2017-06-06 17:18:50 CST INFO
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1]
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor
> - Processing Event EventType: START_CONTAINER for Container
> container_e46_1496740629024_0001_01_000002
> 2017-06-06 17:18:50 CST INFO
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1]
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy$ContainerManagementProtocolProxyData
> - Opening proxy : mz-bigdata-32-227.meizu.com:45454
> 2017-06-06 17:18:50 CST INFO
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #0]
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy$ContainerManagementProtocolProxyData
> - Opening proxy : mz-bigdata-32-226.meizu.com:45454
> 2017-06-06 17:18:50 CST INFO
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1]
> gobblin.yarn.YarnService$NMClientCallbackHandler - Container
> container_e46_1496740629024_0001_01_000002 has been started
> 2017-06-06 17:18:50 CST INFO
> [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #0]
> gobblin.yarn.YarnService$NMClientCallbackHandler - Container
> container_e46_1496740629024_0001_01_000003 has been started
> 2017-06-06 17:19:05 CST INFO
> [ZkClient-EventThread-25-mz-bigdata-32-201.meizu.com:2181,mz-bigdata-32-202.meizu.com:2181,mz-bigdata-32-203.meizu.com:2181]
> gobblin.cluster.GobblinClusterManager$GobblinLiveInstanceChangeListener -
> Live Helix participant instance: GobblinYarnTaskRunner_1
> 2017-06-06 17:19:07 CST INFO
> [ZkClient-EventThread-25-mz-bigdata-32-201.meizu.com:2181,mz-bigdata-32-202.meizu.com:2181,mz-bigdata-32-203.meizu.com:2181]
> gobblin.cluster.GobblinClusterManager$GobblinLiveInstanceChangeListener -
> Live Helix participant instance: GobblinYarnTaskRunner_1
> 2017-06-06 17:19:07 CST INFO
> [ZkClient-EventThread-25-mz-bigdata-32-201.meizu.com:2181,mz-bigdata-32-202.meizu.com:2181,mz-bigdata-32-203.meizu.com:2181]
> gobblin.cluster.GobblinClusterManager$GobblinLiveInstanceChangeListener -
> Live Helix participant instance: GobblinYarnTaskRunner_2
> 2017-06-06 17:21:00 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ClustersNames - no default cluster mapping found
> 2017-06-06 17:21:00 CST WARN [DefaultQuartzScheduler_Worker-1]
> gobblin.runtime.AbstractJobLauncher - Creating a job specific
> SharedResourcesBroker. Objects will only be shared at the job level.
> 2017-06-06 17:21:00 CST INFO [DefaultQuartzScheduler_Worker-1]
> org.reflections.Reflections - Reflections took 657 ms to scan 43 urls,
> producing 645 keys and 2315 values
> 2017-06-06 17:21:00 CST INFO [DefaultQuartzScheduler_Worker-1]
> org.apache.hadoop.conf.Configuration - user.name is deprecated. Instead, use
> mapreduce.job.user.name
> 2017-06-06 17:21:00 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.reflection.GobblinConstructorUtils - Found accessible
> constructor for class class gobblin.runtime.FsDatasetStateStore with
> parameter types [class org.apache.hadoop.hdfs.DistributedFileSystem, class
> java.lang.String, class java.lang.Integer, class
> com.google.common.cache.LocalCache$LocalLoadingCache].
> 2017-06-06 17:21:00 CST INFO [GetFsDatasetStateStore-]
> gobblin.runtime.FsDatasetStateStore$3$1 - Getting dataset states from:
> hdfs://mzcluster/user/yarn/${gobblin.yarn.work.dir}/state-store/GobblinKafkaQuickStart/current.jst
> 2017-06-06 17:21:00 CST INFO [GetFsDatasetStateStore-]
> org.apache.hadoop.io.compress.zlib.ZlibFactory - Successfully loaded &
> initialized native-zlib library
> 2017-06-06 17:21:00 CST INFO [GetFsDatasetStateStore-]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new decompressor
> [.deflate]
> 2017-06-06 17:21:00 CST INFO [GetFsDatasetStateStore-]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new decompressor
> [.deflate]
> 2017-06-06 17:21:00 CST INFO [GetFsDatasetStateStore-]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new decompressor
> [.deflate]
> 2017-06-06 17:21:00 CST INFO [GetFsDatasetStateStore-]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new decompressor
> [.deflate]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@4c9bb974
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@4c9bb974
> 2017-06-06 17:21:01 CST WARN [DefaultQuartzScheduler_Worker-1]
> gobblin.runtime.JobContext - Property writer.staging.dir is deprecated. No
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:21:01 CST WARN [DefaultQuartzScheduler_Worker-1]
> gobblin.runtime.JobContext - Property writer.output.dir is deprecated. No
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@3c553a00[Shutting down, pool size =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@3c553a00[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.kafka.client.Kafka08ConsumerClient - Fetching topic metadata from
> broker mz-bigdata-32-201.meizu.com:9092
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.KafkaSource - Discovered topic test
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@74b1f437[Shutting down, pool size =
> 1, active threads = 1, queued tasks = 0, completed tasks = 0]
> 2017-06-06 17:21:01 CST INFO [pool-29-thread-1]
> gobblin.source.extractor.extract.kafka.KafkaSource - Created workunit for
> partition test:0: lowWatermark=659986, highWatermark=663501, range=3515
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@74b1f437[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.KafkaSource - Created workunits for 1
> topics in 0 seconds
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
> - Estimated avg time to pull a record for topic test is 0.146157
> milliseconds
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
> - For all topics not pulled in the previous run, estimated avg time to pull
> a record is 0.1461565963532356 milliseconds
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 0: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 1: estimated load=154.651281, partitions=[[test:0]]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 2: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 3: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 4: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 5: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 6: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 7: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 8: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 9: estimated load=0.003010, partitions=[]
> 2017-06-06 17:21:01 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 154.651281;
> Diff = 99.998053%
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@1311b3f7
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@1311b3f7
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.runtime.AbstractJobLauncher - Starting job
> job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@53580f39[Shutting down, pool size =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@53580f39[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - runWorkUnits
> 2017-06-06 17:21:02 CST INFO [TaskStateCollectorService STARTING]
> gobblin.runtime.TaskStateCollectorService - Starting the
> TaskStateCollectorService
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [ParallelRunner]
> org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.deflate]
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3b33222d
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3b33222d
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> org.apache.helix.task.TaskDriver - Starting workflow GobblinKafkaQuickStart
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - Created job queue
> GobblinKafkaQuickStart
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> org.apache.helix.task.TaskDriver - Add job configuration
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> org.apache.helix.task.TaskUtil - invoke rebalance for GobblinKafkaQuickStart
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - Submitted job
> job_GobblinKafkaQuickStart_1496740860051 to Helix
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion...
> 2017-06-06 17:21:02 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - workflowContext is null.
> 2017-06-06 17:21:02 CST INFO [Thread-10]
> org.apache.helix.task.WorkflowRebalancer - Job
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051 idealstate
> already exists!
> 2017-06-06 17:21:03 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion -
> helixJobState IN_PROGRESS
> 2017-06-06 17:21:04 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion -
> helixJobState IN_PROGRESS
> 2017-06-06 17:21:05 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion -
> helixJobState IN_PROGRESS
> 2017-06-06 17:21:06 CST INFO [Thread-10]
> org.apache.helix.task.TaskRebalancer - Cleaning up idealstate and
> externalView for job:
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:06 CST INFO [Thread-10]
> org.apache.helix.task.TaskRebalancer - Successfully clean up
> idealstate/externalView for resource
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051.
> 2017-06-06 17:21:06 CST INFO [Thread-10] org.apache.helix.task.JobRebalancer
> - Workflow GobblinKafkaQuickStart or job
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051 is already
> failed or completed, workflow state (IN_PROGRESS), job state (COMPLETED),
> clean up job IS.
> 2017-06-06 17:21:06 CST INFO [Thread-10]
> org.apache.helix.task.TaskRebalancer - Cleaning up idealstate and
> externalView for job:
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:06 CST INFO [Thread-10]
> org.apache.helix.task.TaskRebalancer - Successfully clean up
> idealstate/externalView for resource
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051.
> 2017-06-06 17:21:06 CST WARN [Thread-10] org.apache.helix.model.IdealState
> - Resource
> key:GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051_9 does
> not have a pre-computed preference list.
> 2017-06-06 17:21:06 CST WARN [Thread-10] org.apache.helix.model.IdealState
> - Resource
> key:GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051_4 does
> not have a pre-computed preference list.
> 2017-06-06 17:21:06 CST WARN [Thread-10] org.apache.helix.model.IdealState
> - Resource
> key:GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051_7 does
> not have a pre-computed preference list.
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion
> helixJobState COMPLETED
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - Job
> job_GobblinKafkaQuickStart_1496740860051 completed
> 2017-06-06 17:21:06 CST INFO [TaskStateCollectorService STOPPING]
> gobblin.runtime.TaskStateCollectorService - Stopping the
> TaskStateCollectorService
> 2017-06-06 17:21:06 CST INFO [TaskStateCollectorService STOPPING]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@2aca135b
> 2017-06-06 17:21:06 CST INFO [TaskStateCollectorService STOPPING]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@2aca135b
> 2017-06-06 17:21:06 CST INFO [TaskStateCollectorService STOPPING]
> gobblin.runtime.TaskStateCollectorService - Collected task state of 1
> completed tasks
> 2017-06-06 17:21:06 CST INFO [TaskStateCollectorService STOPPING]
> gobblin.runtime.JobContext - 1 more tasks of job
> job_GobblinKafkaQuickStart_1496740860051 have completed
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.cluster.GobblinHelixJobLauncher - Deleting persisted work units for
> job job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.runtime.JobContext - Persisting dataset urns.
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.reflection.GobblinConstructorUtils - Found accessible
> constructor for class class
> gobblin.metastore.nameParser.SimpleDatasetUrnStateStoreNameParser with
> parameter types [].
> 2017-06-06 17:21:06 CST INFO [Commit-thread-0]
> gobblin.runtime.SafeDatasetCommit - Committing dataset of job
> job_GobblinKafkaQuickStart_1496740860051 with commit policy
> COMMIT_ON_FULL_SUCCESS and state SUCCESSFUL
> 2017-06-06 17:21:06 CST INFO [Commit-thread-0]
> gobblin.publisher.BaseDataPublisher - Moving
> hdfs://mzcluster/gobblin/task-output/test/part.task_GobblinKafkaQuickStart_1496740860051_0.txt
> to
> /gobblintest/job-output/test/part.task_GobblinKafkaQuickStart_1496740860051_0.txt
> 2017-06-06 17:21:06 CST INFO [Commit-thread-0] gobblin.util.ExecutorsUtils
> - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@197d7a98
> 2017-06-06 17:21:06 CST INFO [Commit-thread-0] gobblin.util.ExecutorsUtils
> - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@197d7a98
> 2017-06-06 17:21:06 CST INFO [Commit-thread-0]
> gobblin.runtime.SafeDatasetCommit - Persisting dataset state for dataset
> 2017-06-06 17:21:06 CST INFO [Commit-thread-0]
> gobblin.runtime.FsDatasetStateStore - Persisting
> job_GobblinKafkaQuickStart_1496740860051.jst to the job state store
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@43cfed92
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@43cfed92
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.JobLauncherUtils - Cleaning up staging directory
> /gobblin/task-staging/test
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.JobLauncherUtils - Cleaning up output directory
> /gobblin/task-output/test
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@144164c3
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@144164c3
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@25ca5791[Shutting down, pool size =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:21:06 CST INFO [DefaultQuartzScheduler_Worker-1]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@25ca5791[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST WARN [DefaultQuartzScheduler_Worker-2]
> gobblin.runtime.AbstractJobLauncher - Creating a job specific
> SharedResourcesBroker. Objects will only be shared at the job level.
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.reflection.GobblinConstructorUtils - Found accessible
> constructor for class class gobblin.runtime.FsDatasetStateStore with
> parameter types [class org.apache.hadoop.hdfs.DistributedFileSystem, class
> java.lang.String, class java.lang.Integer, class
> com.google.common.cache.LocalCache$LocalLoadingCache].
> 2017-06-06 17:24:00 CST INFO [GetFsDatasetStateStore-]
> gobblin.runtime.FsDatasetStateStore$3$1 - Getting dataset states from:
> hdfs://mzcluster/user/yarn/${gobblin.yarn.work.dir}/state-store/GobblinKafkaQuickStart/current.jst
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@cfe774d
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@cfe774d
> 2017-06-06 17:24:00 CST WARN [DefaultQuartzScheduler_Worker-2]
> gobblin.runtime.JobContext - Property writer.staging.dir is deprecated. No
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:24:00 CST WARN [DefaultQuartzScheduler_Worker-2]
> gobblin.runtime.JobContext - Property writer.output.dir is deprecated. No
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:24:00 CST WARN [DefaultQuartzScheduler_Worker-2]
> gobblin.metrics.MetricContext$Builder - MetricContext with specified name
> already exists, appending UUID to the given name:
> 5df4b7a6-c054-4f4b-8773-f535ad8bdf40
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@242e5a62[Terminated, pool size = 1,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@242e5a62[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.kafka.client.Kafka08ConsumerClient - Fetching topic metadata from
> broker mz-bigdata-32-201.meizu.com:9092
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.KafkaSource - Discovered topic test
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@50b7fe2f[Shutting down, pool size =
> 1, active threads = 1, queued tasks = 0, completed tasks = 0]
> 2017-06-06 17:24:00 CST INFO [pool-43-thread-1]
> gobblin.source.extractor.extract.kafka.KafkaSource - Created workunit for
> partition test:0: lowWatermark=663501, highWatermark=664308, range=807
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@50b7fe2f[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.KafkaSource - Created workunits for 1
> topics in 0 seconds
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
> - Estimated avg time to pull a record for topic test is 0.482788
> milliseconds
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
> - For all topics not pulled in the previous run, estimated avg time to pull
> a record is 0.48278805120910384 milliseconds
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 0: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 1: estimated load=117.284284, partitions=[[test:0]]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 2: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 3: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 4: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 5: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 6: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 7: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 8: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 9: estimated load=0.003010, partitions=[]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 117.284284;
> Diff = 99.997433%
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@32180870
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@32180870
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.runtime.AbstractJobLauncher - Starting job
> job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@4a2e171a[Shutting down, pool size =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@4a2e171a[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - runWorkUnits
> 2017-06-06 17:24:00 CST INFO [TaskStateCollectorService STARTING]
> gobblin.runtime.TaskStateCollectorService - Starting the
> TaskStateCollectorService
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@5ac858a
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@5ac858a
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - Job queue GobblinKafkaQuickStart
> already exists
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> org.apache.helix.task.TaskDriver - Add job configuration
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> org.apache.helix.task.TaskUtil - invoke rebalance for GobblinKafkaQuickStart
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - Submitted job
> job_GobblinKafkaQuickStart_1496740860051 to Helix
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion...
> 2017-06-06 17:24:00 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion -
> helixJobState null
> 2017-06-06 17:24:00 CST INFO [Thread-10]
> org.apache.helix.task.WorkflowRebalancer - Job
> GobblinKafkaQuickStart_job_GobblinKafkaQuickStart_1496740860051 idealstate
> already exists!
> 2017-06-06 17:24:01 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion -
> helixJobState IN_PROGRESS
> 2017-06-06 17:24:02 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion -
> helixJobState IN_PROGRESS
> ..............................................
> ..............................................
> ..............................................
> 2017-06-06 17:26:59 CST INFO [DefaultQuartzScheduler_Worker-2]
> gobblin.cluster.GobblinHelixJobLauncher - wait for job completion -
> helixJobState IN_PROGRESS
> 2017-06-06 17:27:00 CST WARN [DefaultQuartzScheduler_Worker-3]
> gobblin.runtime.AbstractJobLauncher - Creating a job specific
> SharedResourcesBroker. Objects will only be shared at the job level.
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.reflection.GobblinConstructorUtils - Found accessible
> constructor for class class gobblin.runtime.FsDatasetStateStore with
> parameter types [class org.apache.hadoop.hdfs.DistributedFileSystem, class
> java.lang.String, class java.lang.Integer, class
> com.google.common.cache.LocalCache$LocalLoadingCache].
> 2017-06-06 17:27:00 CST INFO [GetFsDatasetStateStore-]
> gobblin.runtime.FsDatasetStateStore$3$1 - Getting dataset states from:
> hdfs://mzcluster/user/yarn/${gobblin.yarn.work.dir}/state-store/GobblinKafkaQuickStart/current.jst
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@49cb4dbd
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@49cb4dbd
> 2017-06-06 17:27:00 CST WARN [DefaultQuartzScheduler_Worker-3]
> gobblin.runtime.JobContext - Property writer.staging.dir is deprecated. No
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:27:00 CST WARN [DefaultQuartzScheduler_Worker-3]
> gobblin.runtime.JobContext - Property writer.output.dir is deprecated. No
> need to use it if task.data.root.dir is specified.
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@18918ad6[Shutting down, pool size =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@18918ad6[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.kafka.client.Kafka08ConsumerClient - Fetching topic metadata from
> broker mz-bigdata-32-201.meizu.com:9092
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.KafkaSource - Discovered topic test
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@30f8679c[Shutting down, pool size =
> 1, active threads = 1, queued tasks = 0, completed tasks = 0]
> 2017-06-06 17:27:00 CST INFO [pool-51-thread-1]
> gobblin.source.extractor.extract.kafka.KafkaSource - Created workunit for
> partition test:0: lowWatermark=663501, highWatermark=664471, range=970
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@30f8679c[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.KafkaSource - Created workunits for 1
> topics in 0 seconds
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
> - Estimated avg time to pull a record for topic test is 0.482788
> milliseconds
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
> - For all topics not pulled in the previous run, estimated avg time to pull
> a record is 0.48278805120910384 milliseconds
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 0: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 1: estimated load=140.973674, partitions=[[test:0]]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 2: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 3: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 4: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 5: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 6: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 7: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 8: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> MultiWorkUnit 9: estimated load=0.003010, partitions=[]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker -
> Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 140.973674;
> Diff = 99.997865%
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3c2db018
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@3c2db018
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.runtime.AbstractJobLauncher - Starting job
> job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6cbc3980[Shutting down, pool size =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ExecutorsUtils - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6cbc3980[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-06-06 17:27:00 CST INFO [DefaultQuartzScheduler_Worker-3]
> gobblin.cluster.GobblinHelixJobLauncher - runWorkUnits
> 2017-06-06 17:27:00 CST INFO [TaskStateCollectorService STARTING]
> gobblin.runtime.TaskStateCollectorService - Starting the
> TaskStateCollectorService
> 2017-06-06 17:27:00 CST WARN [TaskStateCollectorService RUNNING]
> gobblin.runtime.TaskStateCollectorService - No output task state files found
> in
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_taskstates/job_GobblinKafkaQuickStart_1496740860051
> 2017-06-06 17:27:00 CST WARN [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ParallelRunner - Task failed: Serialize state to store
> job_GobblinKafkaQuickStart_1496740860051 file
> multitask_GobblinKafkaQuickStart_1496740860051_0.mwu
> org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/_tmp_multitask_GobblinKafkaQuickStart_1496740860051_0.mwu
> to
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/multitask_GobblinKafkaQuickStart_1496740860051_0.mwu:
> dst already exists
> at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:219)
> at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:201)
> at gobblin.metastore.FsStateStore.put(FsStateStore.java:176)
> at
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:325)
> at
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:322)
> at
> gobblin.util.executors.MDCPropagatingCallable.call(MDCPropagatingCallable.java:38)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-06 17:27:00 CST WARN [DefaultQuartzScheduler_Worker-3]
> gobblin.util.ParallelRunner - Task failed: Serialize state to store
> job_GobblinKafkaQuickStart_1496740860051 file
> multitask_GobblinKafkaQuickStart_1496740860051_1.mwu
> org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/_tmp_multitask_GobblinKafkaQuickStart_1496740860051_1.mwu
> to
> hdfs://mzcluster/user/yarn/GobblinYarnTest/application_1496740629024_0001/_workunits/job_GobblinKafkaQuickStart_1496740860051/multitask_GobblinKafkaQuickStart_1496740860051_1.mwu:
> dst already exists
> at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:219)
> at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:201)
> at gobblin.metastore.FsStateStore.put(FsStateStore.java:176)
> at
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:325)
> at
> gobblin.cluster.GobblinHelixJobLauncher$2.call(GobblinHelixJobLauncher.java:322)
> at
> gobblin.util.executors.MDCPropagatingCallable.call(MDCPropagatingCallable.java:38)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> ..............................................
>
> *Github Url* :
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-306441710
> ----
> [~phenixmzy] wrote on 2017-06-06T10:17:36Z : the IN_PROGRESS state print log
> ,i modify gobblin.cluster.GobblinHelixJobLauncher#waitForJobCompletion
> private void waitForJobCompletion() throws InterruptedException {
> LOGGER.info(wait for job completion...);
> while (true) {
> WorkflowContext workflowContext =
> TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
> if (workflowContext != null) {
> org.apache.helix.task.TaskState helixJobState =
> workflowContext.getJobState(this.jobResourceName);
> if (helixJobState == org.apache.helix.task.TaskState.COMPLETED ||
> helixJobState == org.apache.helix.task.TaskState.FAILED ||
> helixJobState == org.apache.helix.task.TaskState.STOPPED) {
> LOGGER.info(String.format(wait for job completion helixJobState %s
> , helixJobState));
> return;
> } else {
> LOGGER.info(String.format(wait for job completion helixJobState %s
> , helixJobState));
> }
> } else {
> LOGGER.info(workflowContext is null);
> }
> Thread.sleep(1000);
> }
> }
>
> *Github Url* :
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-306443458
> ----
> [~phenixmzy] wrote on 2017-06-10T14:10:25Z : and builder base gobblin 0.10.0
> and hadoop-2.6.5 :
> ./gradlew clean build -PhadoopVersion=2.6.5 -PhiveVersion=1.2.1
> -Pversion=hadoop-2.6.5-gobblin-0.10.0 -x test --stacktrace
>
> *Github Url* :
> https://github.com/linkedin/gobblin/issues/1926#issuecomment-307567413
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)