[ 
https://issues.apache.org/jira/browse/FLINK-8244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhanglibing000 updated FLINK-8244:
----------------------------------
      Docs Text:   (was: 2017-12-12 20:18:23.968 
[flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory 
/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/flink-dist-cache-2bd20058-9c1a-433b-aa35-e610342aedea
2017-12-12 20:18:23.979 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Starting TaskManager actor at 
akka://flink/user/taskmanager#1035429767.
2017-12-12 20:18:23.980 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - TaskManager data connection 
information: container_1513004072194_0025_01_000002 @ ml41.mlamp.co 
(dataPort=10331)
2017-12-12 20:18:23.981 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - TaskManager has 1 task slot(s).
2017-12-12 20:18:23.982 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Memory usage stats: [HEAP: 
426/2944/2944 MB, NON HEAP: 39/40/-1 MB (used/committed/max)]
2017-12-12 20:18:23.988 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Trying to register at JobManager 
akka.tcp://[email protected]:6542/user/jobmanager (attempt 1, timeout: 500 
milliseconds)
2017-12-12 20:18:24.163 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Successful registration at JobManager 
(akka.tcp://[email protected]:6542/user/jobmanager), starting network stack 
and library cache.
2017-12-12 20:18:24.169 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Determined BLOB server address to be 
ml42.mlamp.co/172.17.1.19:9593. Starting BLOB cache.
2017-12-12 20:18:24.172 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory 
/tmp/blobStore-a7b202b0-477f-40e5-a69e-b04bccb28acc
2017-12-12 20:27:04.138 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Received task DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
2017-12-12 20:27:04.139 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) switched from CREATED to DEPLOYING.
2017-12-12 20:27:04.140 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak 
safety net for task DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [DEPLOYING]
2017-12-12 20:27:04.146 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
2017-12-12 20:27:04.148 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.blob.BlobCache  - Downloading 
15eb13effcf291931c0e661c53c3af04e3b63b78 from ml42.mlamp.co/172.17.1.19:9593
2017-12-12 20:27:04.171 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Registering task at network: 
DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
2017-12-12 20:27:04.178 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) switched from DEPLOYING to RUNNING.
2017-12-12 20:27:04.216 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.addons.hbase.AbstractTableInputFormat  - Initializing 
HBaseConfiguration
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:host.name=ml41.mlamp.co
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:host.name=ml41.mlamp.co
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_144
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_144
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle 
Corporation
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle 
Corporation
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:java.home=/usr/java/jdk1.8.0_144/jre
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:java.home=/usr/java/jdk1.8.0_144/jre
hFlinkAppCopy$$anon$1)) (1/1)] INFO  org.apache.zookeeper.ZooKeeper  - Client 
environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
2017-12-12 20:27:04.423 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
2017-12-12 20:27:04.424 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Initiating client connection, 
connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 
sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, 
quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
2017-12-12 20:27:04.424 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Initiating client connection, 
connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 
sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, 
quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN  
org.apache.zookeeper.ClientCnxn  - SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it.
2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN  
org.apache.zookeeper.ClientCnxn  - SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it.
2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 
172.17.1.20/172.17.1.20:2181
2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 
172.17.1.20/172.17.1.20:2181
2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Socket connection established to 
172.17.1.20/172.17.1.20:2181, initiating session
2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Socket connection established to 
172.17.1.20/172.17.1.20:2181, initiating session
2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 
172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated timeout 
= 60000
2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 
172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated timeout 
= 60000
2017-12-12 20:27:05.024 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] WARN  
org.apache.flink.metrics.MetricGroup  - The operator name DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) exceeded the 80 
characters length limit and was truncated.
2017-12-12 20:27:05.048 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.addons.hbase.AbstractTableInputFormat  - opening split 
(this=com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1@313183a0)[0|[ml42.mlamp.co:60020]|3330�9223370526285553387|3330�9223370526285563387]
2017-12-12 20:27:05.337 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO urce (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Session: 0x2604070bd951a5e closed
2017-12-12 20:27:05.439 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] INFO  
org.apache.zookeeper.ClientCnxn  - EventThread shut down
2017-12-12 20:27:05.439 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Session: 0x2604070bd951a5e closed
2017-12-12 20:27:05.439 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] INFO  
org.apache.zookeeper.ClientCnxn  - EventThread shut down
2017-12-12 20:27:05.453 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) switched from RUNNING to FINISHED.
2017-12-12 20:27:05.453 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for 
DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff).
2017-12-12 20:27:05.453 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
are closed for task DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [FINISHED]
2017-12-12 20:27:05.456 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Un-registering task and sending final 
execution state FINISHED to JobManager for task DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) 
(16703a0cb1de95710c1f204e440eecff)
2017-12-12 20:27:05.464 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Received task DataSink (collect()) 
(1/1)
2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) switched from CREATED to DEPLOYING.
2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak 
safety net for task DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING]
2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
2017-12-12 20:27:05.469 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Registering task at network: 
DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
2017-12-12 20:27:05.470 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) switched from DEPLOYING to RUNNING.
2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) switched from RUNNING to FINISHED.
2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for 
DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89).
2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
are closed for task DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) [FINISHED]
2017-12-12 20:27:05.486 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.yarn.YarnTaskManager  - Un-registering task and sending final 
execution state FINISHED to JobManager for task DataSink (collect()) 
(0bd94830e242beb92a8916ac27cd0b89))
    Description: 
I want to use flink HbaseTableInputFormat api to read data from hbase, and that 
is ok when I try to run my code in the local model, but when I try to run the 
code use the flink yarn session model,there some problems in task manager,and 
it does't has any error message,and no data output from the datasource ,finally 
I find the zookeeper client Create two times in the task manager log as 
attachment show,I think there has some problem in flink.
thanks 


{code:java}
import com.alibaba.fastjson.JSON
import com.google.gson.Gson
import com.demo.shmetro.RuleEnginerStreamingFlinkApp.logger
import com.demo.shmetro.bean.json.JsonBeanGenerator
import com.demo.shmetro.model.TrainData
import com.demo.shmetro.output.GlobalEntity
import com.demo.shmetro.rules.{BatchRuleEngineerByScannerExecutor, 
RuleEngineerByRedisExecutor}
import com.demo.shmetro.source.HBaseTableInputFormat
import com.demo.shmetro.util.RedisUtil
import com.demo.shmetro.utils.{MysqlUtil}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.operators.{DataSource, GroupReduceOperator, 
MapOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.JavaConversions._

object RuleEnginerBatchFlinkAppCopy {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.getConfig.setGlobalJobParameters(params)

    val appName = params.get("batch.name")
    val updateTime = params.getLong("update.time")
    val tableName = params.get("hbase.table.name")
    val columnFamily = params.get("hbase.table.cf")
    val columns = params.get("hbase.table.columns").split(",")
    val startTime = params.get("start.time")
    val endTime = params.get("end.time")
    val trainNo = params.get("train.no")
    val startRow = new StringBuilder
    startRow.append(trainNo).append("\0").append(startTime)
    val endRow = new StringBuilder
    endRow.append(trainNo).append("\0").append(endTime)

    val hBaseDataSource: DataSource[Tuple2[String, String]] = 
env.createInput(new HBaseTableInputFormat[Tuple2[String, String]](tableName, 
columnFamily, columns, null, startRow.toString(), endRow.toString()) {
      private val reuse = new Tuple2[String, String]

      override
      protected def mapResultToTuple(r: Result): Tuple2[String, String] = {
        logger.error("**********hbase row: " + reuse)
        val key = Bytes.toString(r.getRow)
        val value = getMapResult(r)
        reuse.setField(key, 0)
        val data = value.get(key)
        reuse.setField(JSON.toJSON(data).toString, 1)
        return reuse
      }
    })
    hBaseDataSource.collect()
  }

}
{code}


{code:java}
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.addons.hbase.AbstractTableInputFormat;
    import org.apache.flink.addons.hbase.TableInputFormat;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.RegexStringComparator;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.util.Bytes;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    public  abstract class HBaseTableInputFormat<T extends Tuple> extends 
AbstractTableInputFormat<T> {

        private String tableName;
        private String columnFamily;
        private String[] columns;
        private String filterName;
        private String startRow;
        private String stopRow;

        public HBaseTableInputFormat(String tableName, String columnFamily, 
String[] columns, String filterName, String startRow, String stopRow) {
            this.tableName = tableName;
            this.columnFamily = columnFamily;
            this.columns = columns;
            this.filterName = filterName;
            this.startRow = startRow;
            this.stopRow = stopRow;
        }

        protected Scan getScanner() {
            Scan scan = new Scan();
            if (!StringUtils.isEmpty(columnFamily)) {
                scan.addFamily(columnFamily.getBytes());
            }
            if (!StringUtils.isEmpty(filterName)) {
                Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, 
new RegexStringComparator(filterName));
                scan.setFilter(filter);
            }
            if (columns != null && !StringUtils.isEmpty(columnFamily)) {
                for (String column : columns) {
                    scan.addColumn(columnFamily.getBytes(), column.getBytes());
                }
            }
            if (!StringUtils.isEmpty(startRow)) {
                scan.setStartRow(startRow.getBytes());
            }
            if (!StringUtils.isEmpty(stopRow)) {
                scan.setStopRow(stopRow.getBytes());
            }
            return scan;
        }

        protected String getTableName() {
            return tableName;
        }

        protected abstract T mapResultToTuple(Result r);

        @Override
        public void configure(Configuration parameters) {
            table = createTable();
            if (table != null) {
                scan = getScanner();
            }
        }

        private HTable createTable() {
            LOG.info("Initializing HBaseConfiguration");
            org.apache.hadoop.conf.Configuration configuration = 
HBaseConfiguration.create();
            
configuration.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
            configuration.set("hbase.zookeeper.quorum", "x.x.x.x");
            configuration.set("hbase.master.info.port", "2181");
            configuration.set("hbase.master", "172.17.1.21:60000");
            configuration.setInt("hbase.rpc.timeout", 20000);
            configuration.setInt("hbase.client.operation.timeout", 30000);

            try {
                return new HTable(configuration, getTableName());
            } catch (Exception e) {
                LOG.error("Error instantiating a new HTable instance", e);
            }
            return null;
        }

        protected T mapResultToOutType(Result r) {
            return mapResultToTuple(r);
        }

        public Map<String, Map<String, String>> getMapResult(Result result) {

            Map<String, Map<String, String>> resMap = new HashMap<>();
            Cell[] cells = result.rawCells();
            Map<String, String> cellMap = new HashMap<>();
            for (Cell cell : cells) {
                cellMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), 
Bytes.toString(CellUtil.cloneValue(cell)));
            }
            resMap.put(Bytes.toString(result.getRow()), cellMap);
            return resMap;
        }

    }

{code}

{panel:title=Task manager log}
2017-12-12 20:18:23.968 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory 
/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/flink-dist-cache-2bd20058-9c1a-433b-aa35-e610342aedea
2017-12-12 20:18:23.979 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Starting TaskManager actor at 
akka://flink/user/taskmanager#1035429767.
2017-12-12 20:18:23.980 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - TaskManager data connection 
information: container_1513004072194_0025_01_000002 @ ml41.mlamp.co 
(dataPort=10331)
2017-12-12 20:18:23.981 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - TaskManager has 1 task slot(s).
2017-12-12 20:18:23.982 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Memory usage stats: [HEAP: 
426/2944/2944 MB, NON HEAP: 39/40/-1 MB (used/committed/max)]
2017-12-12 20:18:23.988 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Trying to register at JobManager 
akka.tcp://[email protected]:6542/user/jobmanager (attempt 1, timeout: 500 
milliseconds)
2017-12-12 20:18:24.163 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Successful registration at JobManager 
(akka.tcp://[email protected]:6542/user/jobmanager), starting network stack 
and library cache.
2017-12-12 20:18:24.169 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Determined BLOB server address to be 
ml42.mlamp.co/172.17.1.19:9593. Starting BLOB cache.
2017-12-12 20:18:24.172 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory 
/tmp/blobStore-a7b202b0-477f-40e5-a69e-b04bccb28acc
2017-12-12 20:27:04.138 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Received task DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
2017-12-12 20:27:04.139 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) switched from CREATED to DEPLOYING.
2017-12-12 20:27:04.140 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak 
safety net for task DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [DEPLOYING]
2017-12-12 20:27:04.146 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
2017-12-12 20:27:04.148 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.blob.BlobCache  - Downloading 
15eb13effcf291931c0e661c53c3af04e3b63b78 from ml42.mlamp.co/172.17.1.19:9593
2017-12-12 20:27:04.171 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Registering task at network: 
DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
2017-12-12 20:27:04.178 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) switched from DEPLOYING to RUNNING.
2017-12-12 20:27:04.216 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.addons.hbase.AbstractTableInputFormat  - Initializing 
HBaseConfiguration
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:host.name=ml41.mlamp.co
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:host.name=ml41.mlamp.co
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_144
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_144
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle 
Corporation
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle 
Corporation
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:java.home=/usr/java/jdk1.8.0_144/jre
2017-12-12 20:27:04.422 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:java.home=/usr/java/jdk1.8.0_144/jre
hFlinkAppCopy$$anon$1)) (1/1)] INFO  org.apache.zookeeper.ZooKeeper  - Client 
environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
2017-12-12 20:27:04.423 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Client 
environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
2017-12-12 20:27:04.424 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Initiating client connection, 
connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 
sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, 
quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
2017-12-12 20:27:04.424 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Initiating client connection, 
connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 
sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, 
quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN  
org.apache.zookeeper.ClientCnxn  - SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it.
2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN  
org.apache.zookeeper.ClientCnxn  - SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it.
2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 
172.17.1.20/172.17.1.20:2181
2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 
172.17.1.20/172.17.1.20:2181
2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Socket connection established to 
172.17.1.20/172.17.1.20:2181, initiating session
2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Socket connection established to 
172.17.1.20/172.17.1.20:2181, initiating session
2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 
172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated timeout 
= 60000
2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO  
org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 
172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated timeout 
= 60000
2017-12-12 20:27:05.024 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] WARN  
org.apache.flink.metrics.MetricGroup  - The operator name DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) exceeded the 80 
characters length limit and was truncated.
2017-12-12 20:27:05.048 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.addons.hbase.AbstractTableInputFormat  - opening split 
(this=com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1@313183a0)[0|[ml42.mlamp.co:60020]|3330�9223370526285553387|3330�9223370526285563387]
2017-12-12 20:27:05.337 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO urce (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Session: 0x2604070bd951a5e closed
2017-12-12 20:27:05.439 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] INFO  
org.apache.zookeeper.ClientCnxn  - EventThread shut down
2017-12-12 20:27:05.439 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.zookeeper.ZooKeeper  - Session: 0x2604070bd951a5e closed
2017-12-12 20:27:05.439 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] INFO  
org.apache.zookeeper.ClientCnxn  - EventThread shut down
2017-12-12 20:27:05.453 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) switched from RUNNING to FINISHED.
2017-12-12 20:27:05.453 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for 
DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff).
2017-12-12 20:27:05.453 [DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
are closed for task DataSource (at createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
(16703a0cb1de95710c1f204e440eecff) [FINISHED]
2017-12-12 20:27:05.456 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Un-registering task and sending final 
execution state FINISHED to JobManager for task DataSource (at 
createInput(ExecutionEnvironment.java:553) 
(com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) 
(16703a0cb1de95710c1f204e440eecff)
2017-12-12 20:27:05.464 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.yarn.YarnTaskManager  - Received task DataSink (collect()) 
(1/1)
2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) switched from CREATED to DEPLOYING.
2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak 
safety net for task DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING]
2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
2017-12-12 20:27:05.469 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Registering task at network: 
DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
2017-12-12 20:27:05.470 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) switched from DEPLOYING to RUNNING.
2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) switched from RUNNING to FINISHED.
2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for 
DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89).
2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
are closed for task DataSink (collect()) (1/1) 
(0bd94830e242beb92a8916ac27cd0b89) [FINISHED]
2017-12-12 20:27:05.486 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.yarn.YarnTaskManager  - Un-registering task and sending final 
execution state FINISHED to JobManager for task DataSink (collect()) 
(0bd94830e242beb92a8916ac27cd0b89)
{panel}







  was:
I want to use flink HbaseTableInputFormat api to read data from hbase, and that 
is ok when I try to run my code in the local model, but when I try to run the 
code use the flink yarn session model,there some problems in task manager,and 
it does't has any error message,and no data output from the datasource ,finally 
I find the zookeeper client Create two times in the task manager log as 
attachment show,I think there has some problem in flink.
thanks 


{code:java}
import com.alibaba.fastjson.JSON
import com.google.gson.Gson
import com.mininglamp.shmetro.RuleEnginerStreamingFlinkApp.logger
import com.mininglamp.shmetro.bean.json.JsonBeanGenerator
import com.mininglamp.shmetro.model.TrainData
import com.mininglamp.shmetro.output.GlobalEntity
import com.mininglamp.shmetro.rules.{BatchRuleEngineerByScannerExecutor, 
RuleEngineerByRedisExecutor}
import com.mininglamp.shmetro.source.HBaseTableInputFormat
import com.mininglamp.shmetro.util.RedisUtil
import com.mininglamp.shmetro.utils.{MysqlUtil}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.operators.{DataSource, GroupReduceOperator, 
MapOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.JavaConversions._

object RuleEnginerBatchFlinkAppCopy {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.getConfig.setGlobalJobParameters(params)

    val appName = params.get("batch.name")
    val updateTime = params.getLong("update.time")
    val tableName = params.get("hbase.table.name")
    val columnFamily = params.get("hbase.table.cf")
    val columns = params.get("hbase.table.columns").split(",")
    val startTime = params.get("start.time")
    val endTime = params.get("end.time")
    val trainNo = params.get("train.no")
    val startRow = new StringBuilder
    startRow.append(trainNo).append("\0").append(startTime)
    val endRow = new StringBuilder
    endRow.append(trainNo).append("\0").append(endTime)

    val hBaseDataSource: DataSource[Tuple2[String, String]] = 
env.createInput(new HBaseTableInputFormat[Tuple2[String, String]](tableName, 
columnFamily, columns, null, startRow.toString(), endRow.toString()) {
      private val reuse = new Tuple2[String, String]

      override
      protected def mapResultToTuple(r: Result): Tuple2[String, String] = {
        logger.error("**********hbase row: " + reuse)
        val key = Bytes.toString(r.getRow)
        val value = getMapResult(r)
        reuse.setField(key, 0)
        val data = value.get(key)
        reuse.setField(JSON.toJSON(data).toString, 1)
        return reuse
      }
    })
    hBaseDataSource.collect()
  }

}
{code}


{code:java}
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.addons.hbase.AbstractTableInputFormat;
    import org.apache.flink.addons.hbase.TableInputFormat;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.RegexStringComparator;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.util.Bytes;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    public  abstract class HBaseTableInputFormat<T extends Tuple> extends 
AbstractTableInputFormat<T> {

        private String tableName;
        private String columnFamily;
        private String[] columns;
        private String filterName;
        private String startRow;
        private String stopRow;

        public HBaseTableInputFormat(String tableName, String columnFamily, 
String[] columns, String filterName, String startRow, String stopRow) {
            this.tableName = tableName;
            this.columnFamily = columnFamily;
            this.columns = columns;
            this.filterName = filterName;
            this.startRow = startRow;
            this.stopRow = stopRow;
        }

        protected Scan getScanner() {
            Scan scan = new Scan();
            if (!StringUtils.isEmpty(columnFamily)) {
                scan.addFamily(columnFamily.getBytes());
            }
            if (!StringUtils.isEmpty(filterName)) {
                Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, 
new RegexStringComparator(filterName));
                scan.setFilter(filter);
            }
            if (columns != null && !StringUtils.isEmpty(columnFamily)) {
                for (String column : columns) {
                    scan.addColumn(columnFamily.getBytes(), column.getBytes());
                }
            }
            if (!StringUtils.isEmpty(startRow)) {
                scan.setStartRow(startRow.getBytes());
            }
            if (!StringUtils.isEmpty(stopRow)) {
                scan.setStopRow(stopRow.getBytes());
            }
            return scan;
        }

        protected String getTableName() {
            return tableName;
        }

        protected abstract T mapResultToTuple(Result r);

        @Override
        public void configure(Configuration parameters) {
            table = createTable();
            if (table != null) {
                scan = getScanner();
            }
        }

        private HTable createTable() {
            LOG.info("Initializing HBaseConfiguration");
            org.apache.hadoop.conf.Configuration configuration = 
HBaseConfiguration.create();
            
configuration.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
            configuration.set("hbase.zookeeper.quorum", "x.x.x.x");
            configuration.set("hbase.master.info.port", "2181");
            configuration.set("hbase.master", "172.17.1.21:60000");
            configuration.setInt("hbase.rpc.timeout", 20000);
            configuration.setInt("hbase.client.operation.timeout", 30000);

            try {
                return new HTable(configuration, getTableName());
            } catch (Exception e) {
                LOG.error("Error instantiating a new HTable instance", e);
            }
            return null;
        }

        protected T mapResultToOutType(Result r) {
            return mapResultToTuple(r);
        }

        public Map<String, Map<String, String>> getMapResult(Result result) {

            Map<String, Map<String, String>> resMap = new HashMap<>();
            Cell[] cells = result.rawCells();
            Map<String, String> cellMap = new HashMap<>();
            for (Cell cell : cells) {
                cellMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), 
Bytes.toString(CellUtil.cloneValue(cell)));
            }
            resMap.put(Bytes.toString(result.getRow()), cellMap);
            return resMap;
        }

    }

{code}








> There are two zookeeper client created when read data from hbase in the flink 
> yarn session model 
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8244
>                 URL: https://issues.apache.org/jira/browse/FLINK-8244
>             Project: Flink
>          Issue Type: Bug
>          Components: Batch Connectors and Input/Output Formats
>    Affects Versions: 1.3.2
>            Reporter: zhanglibing000
>
> I want to use flink HbaseTableInputFormat api to read data from hbase, and 
> that is ok when I try to run my code in the local model, but when I try to 
> run the code use the flink yarn session model,there some problems in task 
> manager,and it does't has any error message,and no data output from the 
> datasource ,finally I find the zookeeper client Create two times in the task 
> manager log as attachment show,I think there has some problem in flink.
> thanks 
> {code:java}
> import com.alibaba.fastjson.JSON
> import com.google.gson.Gson
> import com.demo.shmetro.RuleEnginerStreamingFlinkApp.logger
> import com.demo.shmetro.bean.json.JsonBeanGenerator
> import com.demo.shmetro.model.TrainData
> import com.demo.shmetro.output.GlobalEntity
> import com.demo.shmetro.rules.{BatchRuleEngineerByScannerExecutor, 
> RuleEngineerByRedisExecutor}
> import com.demo.shmetro.source.HBaseTableInputFormat
> import com.demo.shmetro.util.RedisUtil
> import com.demo.shmetro.utils.{MysqlUtil}
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.operators.{DataSource, GroupReduceOperator, 
> MapOperator}
> import org.apache.flink.api.java.tuple.Tuple2
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.hadoop.hbase.client.Result
> import org.apache.hadoop.hbase.util.Bytes
> import scala.collection.JavaConversions._
> object RuleEnginerBatchFlinkAppCopy {
>   def main(args: Array[String]): Unit = {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     env.setParallelism(1)
>     env.getConfig.setGlobalJobParameters(params)
>     val appName = params.get("batch.name")
>     val updateTime = params.getLong("update.time")
>     val tableName = params.get("hbase.table.name")
>     val columnFamily = params.get("hbase.table.cf")
>     val columns = params.get("hbase.table.columns").split(",")
>     val startTime = params.get("start.time")
>     val endTime = params.get("end.time")
>     val trainNo = params.get("train.no")
>     val startRow = new StringBuilder
>     startRow.append(trainNo).append("\0").append(startTime)
>     val endRow = new StringBuilder
>     endRow.append(trainNo).append("\0").append(endTime)
>     val hBaseDataSource: DataSource[Tuple2[String, String]] = 
> env.createInput(new HBaseTableInputFormat[Tuple2[String, String]](tableName, 
> columnFamily, columns, null, startRow.toString(), endRow.toString()) {
>       private val reuse = new Tuple2[String, String]
>       override
>       protected def mapResultToTuple(r: Result): Tuple2[String, String] = {
>         logger.error("**********hbase row: " + reuse)
>         val key = Bytes.toString(r.getRow)
>         val value = getMapResult(r)
>         reuse.setField(key, 0)
>         val data = value.get(key)
>         reuse.setField(JSON.toJSON(data).toString, 1)
>         return reuse
>       }
>     })
>     hBaseDataSource.collect()
>   }
> }
> {code}
> {code:java}
>     import org.apache.commons.lang.StringUtils;
>     import org.apache.flink.addons.hbase.AbstractTableInputFormat;
>     import org.apache.flink.addons.hbase.TableInputFormat;
>     import org.apache.flink.api.java.tuple.Tuple;
>     import org.apache.flink.api.java.tuple.Tuple2;
>     import org.apache.flink.configuration.Configuration;
>     import org.apache.hadoop.hbase.Cell;
>     import org.apache.hadoop.hbase.CellUtil;
>     import org.apache.hadoop.hbase.HBaseConfiguration;
>     import org.apache.hadoop.hbase.HConstants;
>     import org.apache.hadoop.hbase.client.HTable;
>     import org.apache.hadoop.hbase.client.Result;
>     import org.apache.hadoop.hbase.client.Scan;
>     import org.apache.hadoop.hbase.filter.CompareFilter;
>     import org.apache.hadoop.hbase.filter.Filter;
>     import org.apache.hadoop.hbase.filter.RegexStringComparator;
>     import org.apache.hadoop.hbase.filter.RowFilter;
>     import org.apache.hadoop.hbase.util.Bytes;
>     import java.util.HashMap;
>     import java.util.List;
>     import java.util.Map;
>     public  abstract class HBaseTableInputFormat<T extends Tuple> extends 
> AbstractTableInputFormat<T> {
>         private String tableName;
>         private String columnFamily;
>         private String[] columns;
>         private String filterName;
>         private String startRow;
>         private String stopRow;
>         public HBaseTableInputFormat(String tableName, String columnFamily, 
> String[] columns, String filterName, String startRow, String stopRow) {
>             this.tableName = tableName;
>             this.columnFamily = columnFamily;
>             this.columns = columns;
>             this.filterName = filterName;
>             this.startRow = startRow;
>             this.stopRow = stopRow;
>         }
>         protected Scan getScanner() {
>             Scan scan = new Scan();
>             if (!StringUtils.isEmpty(columnFamily)) {
>                 scan.addFamily(columnFamily.getBytes());
>             }
>             if (!StringUtils.isEmpty(filterName)) {
>                 Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, 
> new RegexStringComparator(filterName));
>                 scan.setFilter(filter);
>             }
>             if (columns != null && !StringUtils.isEmpty(columnFamily)) {
>                 for (String column : columns) {
>                     scan.addColumn(columnFamily.getBytes(), 
> column.getBytes());
>                 }
>             }
>             if (!StringUtils.isEmpty(startRow)) {
>                 scan.setStartRow(startRow.getBytes());
>             }
>             if (!StringUtils.isEmpty(stopRow)) {
>                 scan.setStopRow(stopRow.getBytes());
>             }
>             return scan;
>         }
>         protected String getTableName() {
>             return tableName;
>         }
>         protected abstract T mapResultToTuple(Result r);
>         @Override
>         public void configure(Configuration parameters) {
>             table = createTable();
>             if (table != null) {
>                 scan = getScanner();
>             }
>         }
>         private HTable createTable() {
>             LOG.info("Initializing HBaseConfiguration");
>             org.apache.hadoop.conf.Configuration configuration = 
> HBaseConfiguration.create();
>             
> configuration.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
> 1200000);
>             configuration.set("hbase.zookeeper.quorum", "x.x.x.x");
>             configuration.set("hbase.master.info.port", "2181");
>             configuration.set("hbase.master", "172.17.1.21:60000");
>             configuration.setInt("hbase.rpc.timeout", 20000);
>             configuration.setInt("hbase.client.operation.timeout", 30000);
>             try {
>                 return new HTable(configuration, getTableName());
>             } catch (Exception e) {
>                 LOG.error("Error instantiating a new HTable instance", e);
>             }
>             return null;
>         }
>         protected T mapResultToOutType(Result r) {
>             return mapResultToTuple(r);
>         }
>         public Map<String, Map<String, String>> getMapResult(Result result) {
>             Map<String, Map<String, String>> resMap = new HashMap<>();
>             Cell[] cells = result.rawCells();
>             Map<String, String> cellMap = new HashMap<>();
>             for (Cell cell : cells) {
>                 cellMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), 
> Bytes.toString(CellUtil.cloneValue(cell)));
>             }
>             resMap.put(Bytes.toString(result.getRow()), cellMap);
>             return resMap;
>         }
>     }
> {code}
> {panel:title=Task manager log}
> 2017-12-12 20:18:23.968 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.filecache.FileCache  - User file cache uses 
> directory 
> /data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/flink-dist-cache-2bd20058-9c1a-433b-aa35-e610342aedea
> 2017-12-12 20:18:23.979 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Starting TaskManager actor at 
> akka://flink/user/taskmanager#1035429767.
> 2017-12-12 20:18:23.980 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - TaskManager data connection 
> information: container_1513004072194_0025_01_000002 @ ml41.mlamp.co 
> (dataPort=10331)
> 2017-12-12 20:18:23.981 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - TaskManager has 1 task slot(s).
> 2017-12-12 20:18:23.982 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Memory usage stats: [HEAP: 
> 426/2944/2944 MB, NON HEAP: 39/40/-1 MB (used/committed/max)]
> 2017-12-12 20:18:23.988 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Trying to register at JobManager 
> akka.tcp://[email protected]:6542/user/jobmanager (attempt 1, timeout: 500 
> milliseconds)
> 2017-12-12 20:18:24.163 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Successful registration at 
> JobManager (akka.tcp://[email protected]:6542/user/jobmanager), starting 
> network stack and library cache.
> 2017-12-12 20:18:24.169 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Determined BLOB server address to be 
> ml42.mlamp.co/172.17.1.19:9593. Starting BLOB cache.
> 2017-12-12 20:18:24.172 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage 
> directory /tmp/blobStore-a7b202b0-477f-40e5-a69e-b04bccb28acc
> 2017-12-12 20:27:04.138 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Received task DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> 2017-12-12 20:27:04.139 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff) switched from CREATED to DEPLOYING.
> 2017-12-12 20:27:04.140 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak 
> safety net for task DataSource (at createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff) [DEPLOYING]
> 2017-12-12 20:27:04.146 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
> DataSource (at createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
> 2017-12-12 20:27:04.148 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.blob.BlobCache  - Downloading 
> 15eb13effcf291931c0e661c53c3af04e3b63b78 from ml42.mlamp.co/172.17.1.19:9593
> 2017-12-12 20:27:04.171 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Registering task at network: 
> DataSource (at createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
> 2017-12-12 20:27:04.178 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff) switched from DEPLOYING to RUNNING.
> 2017-12-12 20:27:04.216 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.addons.hbase.AbstractTableInputFormat  - Initializing 
> HBaseConfiguration
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client 
> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client 
> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client environment:host.name=ml41.mlamp.co
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client environment:host.name=ml41.mlamp.co
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_144
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_144
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle 
> Corporation
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle 
> Corporation
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.home=/usr/java/jdk1.8.0_144/jre
> 2017-12-12 20:27:04.422 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.home=/usr/java/jdk1.8.0_144/jre
> hFlinkAppCopy$$anon$1)) (1/1)] INFO  org.apache.zookeeper.ZooKeeper  - Client 
> environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
> 2017-12-12 20:27:04.423 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Client 
> environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
> 2017-12-12 20:27:04.424 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Initiating client connection, 
> connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 
> sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, 
> quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
> 2017-12-12 20:27:04.424 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Initiating client connection, 
> connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 
> sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, 
> quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
> 2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN  
> org.apache.zookeeper.ClientCnxn  - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN  
> org.apache.zookeeper.ClientCnxn  - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO  
> org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 
> 172.17.1.20/172.17.1.20:2181
> 2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO  
> org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 
> 172.17.1.20/172.17.1.20:2181
> 2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO  
> org.apache.zookeeper.ClientCnxn  - Socket connection established to 
> 172.17.1.20/172.17.1.20:2181, initiating session
> 2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO  
> org.apache.zookeeper.ClientCnxn  - Socket connection established to 
> 172.17.1.20/172.17.1.20:2181, initiating session
> 2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO  
> org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 
> 172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated 
> timeout = 60000
> 2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO  
> org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 
> 172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated 
> timeout = 60000
> 2017-12-12 20:27:05.024 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] WARN  
> org.apache.flink.metrics.MetricGroup  - The operator name DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) exceeded the 80 
> characters length limit and was truncated.
> 2017-12-12 20:27:05.048 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.addons.hbase.AbstractTableInputFormat  - opening split 
> (this=com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1@313183a0)[0|[ml42.mlamp.co:60020]|3330�9223370526285553387|3330�9223370526285563387]
> 2017-12-12 20:27:05.337 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO urce (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Session: 0x2604070bd951a5e closed
> 2017-12-12 20:27:05.439 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] 
> INFO  org.apache.zookeeper.ClientCnxn  - EventThread shut down
> 2017-12-12 20:27:05.439 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.zookeeper.ZooKeeper  - Session: 0x2604070bd951a5e closed
> 2017-12-12 20:27:05.439 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] 
> INFO  org.apache.zookeeper.ClientCnxn  - EventThread shut down
> 2017-12-12 20:27:05.453 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff) switched from RUNNING to FINISHED.
> 2017-12-12 20:27:05.453 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for 
> DataSource (at createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff).
> 2017-12-12 20:27:05.453 [DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
> are closed for task DataSource (at createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) 
> (16703a0cb1de95710c1f204e440eecff) [FINISHED]
> 2017-12-12 20:27:05.456 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Un-registering task and sending 
> final execution state FINISHED to JobManager for task DataSource (at 
> createInput(ExecutionEnvironment.java:553) 
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) 
> (16703a0cb1de95710c1f204e440eecff)
> 2017-12-12 20:27:05.464 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Received task DataSink (collect()) 
> (1/1)
> 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
> (0bd94830e242beb92a8916ac27cd0b89) switched from CREATED to DEPLOYING.
> 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak 
> safety net for task DataSink (collect()) (1/1) 
> (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING]
> 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
> DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
> 2017-12-12 20:27:05.469 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Registering task at network: 
> DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
> 2017-12-12 20:27:05.470 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
> (0bd94830e242beb92a8916ac27cd0b89) switched from DEPLOYING to RUNNING.
> 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - DataSink (collect()) (1/1) 
> (0bd94830e242beb92a8916ac27cd0b89) switched from RUNNING to FINISHED.
> 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for 
> DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89).
> 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO  
> org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
> are closed for task DataSink (collect()) (1/1) 
> (0bd94830e242beb92a8916ac27cd0b89) [FINISHED]
> 2017-12-12 20:27:05.486 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.yarn.YarnTaskManager  - Un-registering task and sending 
> final execution state FINISHED to JobManager for task DataSink (collect()) 
> (0bd94830e242beb92a8916ac27cd0b89)
> {panel}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to