[
https://issues.apache.org/jira/browse/FLINK-8244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-8244:
----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor (was:
auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Minor, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> There are two zookeeper client created when read data from hbase in the flink
> yarn session model
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-8244
> URL: https://issues.apache.org/jira/browse/FLINK-8244
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase
> Affects Versions: 1.3.2
> Reporter: LionelZ
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> I want to use flink HbaseTableInputFormat api to read data from hbase, and
> that is ok when I try to run my code in the local model, but when I try to
> run the code use the flink yarn session model,there some problems in task
> manager,and it does't has any error message,and no data output from the
> datasource ,finally I find the zookeeper client Create two times in the task
> manager log as attachment show,I think there has some problem in flink.
> thanks
> {code:java}
> import com.alibaba.fastjson.JSON
> import com.google.gson.Gson
> import com.demo.shmetro.RuleEnginerStreamingFlinkApp.logger
> import com.demo.shmetro.bean.json.JsonBeanGenerator
> import com.demo.shmetro.model.TrainData
> import com.demo.shmetro.output.GlobalEntity
> import com.demo.shmetro.rules.{BatchRuleEngineerByScannerExecutor,
> RuleEngineerByRedisExecutor}
> import com.demo.shmetro.source.HBaseTableInputFormat
> import com.demo.shmetro.util.RedisUtil
> import com.demo.shmetro.utils.{MysqlUtil}
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.operators.{DataSource, GroupReduceOperator,
> MapOperator}
> import org.apache.flink.api.java.tuple.Tuple2
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.hadoop.hbase.client.Result
> import org.apache.hadoop.hbase.util.Bytes
> import scala.collection.JavaConversions._
> object RuleEnginerBatchFlinkAppCopy {
> def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> env.getConfig.setGlobalJobParameters(params)
> val appName = params.get("batch.name")
> val updateTime = params.getLong("update.time")
> val tableName = params.get("hbase.table.name")
> val columnFamily = params.get("hbase.table.cf")
> val columns = params.get("hbase.table.columns").split(",")
> val startTime = params.get("start.time")
> val endTime = params.get("end.time")
> val trainNo = params.get("train.no")
> val startRow = new StringBuilder
> startRow.append(trainNo).append("\0").append(startTime)
> val endRow = new StringBuilder
> endRow.append(trainNo).append("\0").append(endTime)
> val hBaseDataSource: DataSource[Tuple2[String, String]] =
> env.createInput(new HBaseTableInputFormat[Tuple2[String, String]](tableName,
> columnFamily, columns, null, startRow.toString(), endRow.toString()) {
> private val reuse = new Tuple2[String, String]
> override
> protected def mapResultToTuple(r: Result): Tuple2[String, String] = {
> logger.error("**********hbase row: " + reuse)
> val key = Bytes.toString(r.getRow)
> val value = getMapResult(r)
> reuse.setField(key, 0)
> val data = value.get(key)
> reuse.setField(JSON.toJSON(data).toString, 1)
> return reuse
> }
> })
> hBaseDataSource.collect()
> }
> }
> {code}
> {code:java}
> import org.apache.commons.lang.StringUtils;
> import org.apache.flink.addons.hbase.AbstractTableInputFormat;
> import org.apache.flink.addons.hbase.TableInputFormat;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.hadoop.hbase.Cell;
> import org.apache.hadoop.hbase.CellUtil;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.HConstants;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.client.Result;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.filter.CompareFilter;
> import org.apache.hadoop.hbase.filter.Filter;
> import org.apache.hadoop.hbase.filter.RegexStringComparator;
> import org.apache.hadoop.hbase.filter.RowFilter;
> import org.apache.hadoop.hbase.util.Bytes;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> public abstract class HBaseTableInputFormat<T extends Tuple> extends
> AbstractTableInputFormat<T> {
> private String tableName;
> private String columnFamily;
> private String[] columns;
> private String filterName;
> private String startRow;
> private String stopRow;
> public HBaseTableInputFormat(String tableName, String columnFamily,
> String[] columns, String filterName, String startRow, String stopRow) {
> this.tableName = tableName;
> this.columnFamily = columnFamily;
> this.columns = columns;
> this.filterName = filterName;
> this.startRow = startRow;
> this.stopRow = stopRow;
> }
> protected Scan getScanner() {
> Scan scan = new Scan();
> if (!StringUtils.isEmpty(columnFamily)) {
> scan.addFamily(columnFamily.getBytes());
> }
> if (!StringUtils.isEmpty(filterName)) {
> Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
> new RegexStringComparator(filterName));
> scan.setFilter(filter);
> }
> if (columns != null && !StringUtils.isEmpty(columnFamily)) {
> for (String column : columns) {
> scan.addColumn(columnFamily.getBytes(),
> column.getBytes());
> }
> }
> if (!StringUtils.isEmpty(startRow)) {
> scan.setStartRow(startRow.getBytes());
> }
> if (!StringUtils.isEmpty(stopRow)) {
> scan.setStopRow(stopRow.getBytes());
> }
> return scan;
> }
> protected String getTableName() {
> return tableName;
> }
> protected abstract T mapResultToTuple(Result r);
> @Override
> public void configure(Configuration parameters) {
> table = createTable();
> if (table != null) {
> scan = getScanner();
> }
> }
> private HTable createTable() {
> LOG.info("Initializing HBaseConfiguration");
> org.apache.hadoop.conf.Configuration configuration =
> HBaseConfiguration.create();
>
> configuration.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
> 1200000);
> configuration.set("hbase.zookeeper.quorum", "x.x.x.x");
> configuration.set("hbase.master.info.port", "2181");
> configuration.set("hbase.master", "172.17.1.21:60000");
> configuration.setInt("hbase.rpc.timeout", 20000);
> configuration.setInt("hbase.client.operation.timeout", 30000);
> try {
> return new HTable(configuration, getTableName());
> } catch (Exception e) {
> LOG.error("Error instantiating a new HTable instance", e);
> }
> return null;
> }
> protected T mapResultToOutType(Result r) {
> return mapResultToTuple(r);
> }
> public Map<String, Map<String, String>> getMapResult(Result result) {
> Map<String, Map<String, String>> resMap = new HashMap<>();
> Cell[] cells = result.rawCells();
> Map<String, String> cellMap = new HashMap<>();
> for (Cell cell : cells) {
> cellMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)),
> Bytes.toString(CellUtil.cloneValue(cell)));
> }
> resMap.put(Bytes.toString(result.getRow()), cellMap);
> return resMap;
> }
> }
> {code}
> {panel:title=Task manager log}
> 2017-12-12 20:18:23.968 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.runtime.filecache.FileCache - User file cache uses
> directory
> /data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/flink-dist-cache-2bd20058-9c1a-433b-aa35-e610342aedea
> 2017-12-12 20:18:23.979 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Starting TaskManager actor at
> akka://flink/user/taskmanager#1035429767.
> 2017-12-12 20:18:23.980 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - TaskManager data connection
> information: container_1513004072194_0025_01_000002 @ ml41.mlamp.co
> (dataPort=10331)
> 2017-12-12 20:18:23.981 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - TaskManager has 1 task slot(s).
> 2017-12-12 20:18:23.982 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Memory usage stats: [HEAP:
> 426/2944/2944 MB, NON HEAP: 39/40/-1 MB (used/committed/max)]
> 2017-12-12 20:18:23.988 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Trying to register at JobManager
> akka.tcp://[email protected]:6542/user/jobmanager (attempt 1, timeout: 500
> milliseconds)
> 2017-12-12 20:18:24.163 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Successful registration at
> JobManager (akka.tcp://[email protected]:6542/user/jobmanager), starting
> network stack and library cache.
> 2017-12-12 20:18:24.169 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Determined BLOB server address to be
> ml42.mlamp.co/172.17.1.19:9593. Starting BLOB cache.
> 2017-12-12 20:18:24.172 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage
> directory /tmp/blobStore-a7b202b0-477f-40e5-a69e-b04bccb28acc
> 2017-12-12 20:27:04.138 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Received task DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> 2017-12-12 20:27:04.139 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff) switched from CREATED to DEPLOYING.
> 2017-12-12 20:27:04.140 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak
> safety net for task DataSource (at createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff) [DEPLOYING]
> 2017-12-12 20:27:04.146 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
> DataSource (at createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
> 2017-12-12 20:27:04.148 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.blob.BlobCache - Downloading
> 15eb13effcf291931c0e661c53c3af04e3b63b78 from ml42.mlamp.co/172.17.1.19:9593
> 2017-12-12 20:27:04.171 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
> DataSource (at createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff) [DEPLOYING].
> 2017-12-12 20:27:04.178 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff) switched from DEPLOYING to RUNNING.
> 2017-12-12 20:27:04.216 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.addons.hbase.AbstractTableInputFormat - Initializing
> HBaseConfiguration
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client
> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client
> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client environment:host.name=ml41.mlamp.co
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client environment:host.name=ml41.mlamp.co
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_144
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_144
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle
> Corporation
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle
> Corporation
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client
> environment:java.home=/usr/java/jdk1.8.0_144/jre
> 2017-12-12 20:27:04.422 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client
> environment:java.home=/usr/java/jdk1.8.0_144/jre
> hFlinkAppCopy$$anon$1)) (1/1)] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
> 2017-12-12 20:27:04.423 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Client
> environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002
> 2017-12-12 20:27:04.424 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Initiating client connection,
> connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181
> sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0,
> quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
> 2017-12-12 20:27:04.424 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Initiating client connection,
> connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181
> sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0,
> quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase
> 2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN
> org.apache.zookeeper.ClientCnxn - SASL configuration failed:
> javax.security.auth.login.LoginException: No JAAS configuration section named
> 'Client' was found in specified JAAS configuration file:
> '/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper
> server without SASL authentication, if Zookeeper server allows it.
> 2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN
> org.apache.zookeeper.ClientCnxn - SASL configuration failed:
> javax.security.auth.login.LoginException: No JAAS configuration section named
> 'Client' was found in specified JAAS configuration file:
> '/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper
> server without SASL authentication, if Zookeeper server allows it.
> 2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO
> org.apache.zookeeper.ClientCnxn - Opening socket connection to server
> 172.17.1.20/172.17.1.20:2181
> 2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO
> org.apache.zookeeper.ClientCnxn - Opening socket connection to server
> 172.17.1.20/172.17.1.20:2181
> 2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO
> org.apache.zookeeper.ClientCnxn - Socket connection established to
> 172.17.1.20/172.17.1.20:2181, initiating session
> 2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO
> org.apache.zookeeper.ClientCnxn - Socket connection established to
> 172.17.1.20/172.17.1.20:2181, initiating session
> 2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO
> org.apache.zookeeper.ClientCnxn - Session establishment complete on server
> 172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated
> timeout = 60000
> 2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO
> org.apache.zookeeper.ClientCnxn - Session establishment complete on server
> 172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated
> timeout = 60000
> 2017-12-12 20:27:05.024 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] WARN
> org.apache.flink.metrics.MetricGroup - The operator name DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) exceeded the 80
> characters length limit and was truncated.
> 2017-12-12 20:27:05.048 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.addons.hbase.AbstractTableInputFormat - opening split
> (this=com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1@313183a0)[0|[ml42.mlamp.co:60020]|3330�9223370526285553387|3330�9223370526285563387]
> 2017-12-12 20:27:05.337 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO urce (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Session: 0x2604070bd951a5e closed
> 2017-12-12 20:27:05.439 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread]
> INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
> 2017-12-12 20:27:05.439 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.zookeeper.ZooKeeper - Session: 0x2604070bd951a5e closed
> 2017-12-12 20:27:05.439 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread]
> INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
> 2017-12-12 20:27:05.453 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff) switched from RUNNING to FINISHED.
> 2017-12-12 20:27:05.453 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Freeing task resources for
> DataSource (at createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff).
> 2017-12-12 20:27:05.453 [DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams
> are closed for task DataSource (at createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)
> (16703a0cb1de95710c1f204e440eecff) [FINISHED]
> 2017-12-12 20:27:05.456 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Un-registering task and sending
> final execution state FINISHED to JobManager for task DataSource (at
> createInput(ExecutionEnvironment.java:553)
> (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1))
> (16703a0cb1de95710c1f204e440eecff)
> 2017-12-12 20:27:05.464 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.yarn.YarnTaskManager - Received task DataSink (collect())
> (1/1)
> 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1)
> (0bd94830e242beb92a8916ac27cd0b89) switched from CREATED to DEPLOYING.
> 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak
> safety net for task DataSink (collect()) (1/1)
> (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING]
> 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
> DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
> 2017-12-12 20:27:05.469 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
> DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING].
> 2017-12-12 20:27:05.470 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1)
> (0bd94830e242beb92a8916ac27cd0b89) switched from DEPLOYING to RUNNING.
> 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1)
> (0bd94830e242beb92a8916ac27cd0b89) switched from RUNNING to FINISHED.
> 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Freeing task resources for
> DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89).
> 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams
> are closed for task DataSink (collect()) (1/1)
> (0bd94830e242beb92a8916ac27cd0b89) [FINISHED]
> 2017-12-12 20:27:05.486 [flink-akka.actor.default-dispatcher-2] INFO
> org.apache.flink.yarn.YarnTaskManager - Un-registering task and sending
> final execution state FINISHED to JobManager for task DataSink (collect())
> (0bd94830e242beb92a8916ac27cd0b89)
> {panel}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)