[ 
https://issues.apache.org/jira/browse/FLINK-30075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhanglu153 updated FLINK-30075:
-------------------------------
    Description: 
When I test kafka and hive lookup join, only the connection between tm and 
zookeeper is disconnected after the data is successfully loaded into the cache. 
In this case, the task will be restarted because the flink task restart policy 
is configured (note that the task process does not change at this time, and the 
task restarts in the same container).

The task sql is as follows:
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE kafka_table_1 (\n" +
"  a string,\n" +
"  b int,\n" +
"  c string,\n" +
"  log_ts TIMESTAMP(0),\n" +
") WITH (\n" +
"    'connector' = 'kafka',\n" +
"    'topic' = 'test',\n" +
"    'scan.startup.mode' = 'latest-offset',\n" +
"    'properties.bootstrap.servers' = '172.16.144.208:9092',\n" +
"    'format' = 'csv'\n" +
")");
String name = "hive_catalog";
String hiveConfDir = "/cloud/service/flink/conf";
HiveCatalog hive = new HiveCatalog(name, null, hiveConfDir);
hive.open();
tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("create table if not exists ttt(a string,b int,c string)");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("select * from 
default_catalog.default_database.kafka_table_1 as u join ttt for system_time as 
of u.proctime as o on u.a = o.a").print();{code}
After the task is successfully restarted, kafka produces data. When some data 
comes in, an error message is displayed indicating that loading data into the 
cache fails. The error message is as follows:
{code:java}
2022-11-17 14:09:16,041 INFO  
org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Populating 
lookup join cache
2022-11-17 14:09:16,051 INFO  org.apache.hadoop.conf.Configuration              
           [] - getProps 1230612033
2022-11-17 14:09:16,053 INFO  org.apache.hadoop.io.retry.RetryInvocationHandler 
           [] - java.io.IOException: 
org.apache.flink.shaded.hadoop2.com.google.protobuf.ServiceException: 
java.lang.IllegalStateException: Trying to access closed classloader. Please 
check if you store classloaders directly or indirectly in static fields. If the 
stacktrace suggests that the leak occurs in a third party library and cannot be 
fixed immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'., while invoking 
ClientNamenodeProtocolTranslatorPB.getFileInfo over 
hdp-hadoop-hdp-namenode-1.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.6.85:9000.
 Trying to failover immediately.
2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                      
           [] - The ping interval is 60000 ms.
2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                      
           [] - Connecting to 
hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000
2022-11-17 14:09:16,055 DEBUG org.apache.hadoop.security.UserGroupInformation   
           [] - PrivilegedAction 
as:hadoop/hdp-flinkhistory-hdp-flink-history-6754db4f9c-8b8nd.hdp-flinkhistory-hdp-flink-history.hdp-dev-env-3.svc.cluster.lo...@dahua.com
 (auth:KERBEROS) 
from:org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825)
2022-11-17 14:09:16,056 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - Sending sasl message state: NEGOTIATE

2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - Get token info proto:interface 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB 
info:@org.apache.hadoop.security.token.TokenInfo(value=class 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector)
2022-11-17 14:09:16,057 INFO  org.apache.hadoop.security.SaslRpcClient          
           [] - getServerPrincipal 1230612033
2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - Get kerberos info proto:interface 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB 
info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=, 
serverPrincipal=dfs.namenode.kerberos.principal)
2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - getting serverKey: dfs.namenode.kerberos.principal conf value: 
null principal: null
2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.ipc.Client                      
           [] - closing ipc connection to 
hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000:
 Couldn't set up IO streams: java.lang.IllegalArgumentException: Failed to 
specify server's Kerberos principal name
java.io.IOException: Couldn't set up IO streams: 
java.lang.IllegalArgumentException: Failed to specify server's Kerberos 
principal name
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:891) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection.access$3700(Client.java:423) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1615) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client.call(Client.java:1446) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client.call(Client.java:1399) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source) ~[?:?]
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:800)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_342]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_342]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_342]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at com.sun.proxy.$Proxy28.getFileInfo(Unknown Source) ~[?:?]
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1673) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1632) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:334)
 ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
        at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:318)
 ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
        at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:83)
 ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
        at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.lambda$eval$1(HdpFileSystemLookupFunction.java:59)
 ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:100)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.eval(HdpFileSystemLookupFunction.java:58)
 [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at LookupFunction$10.flatMap(Unknown Source) 
[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
        at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
 [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
 [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at StreamExecCalc$7.processElement(Unknown Source) 
[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
[flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
Caused by: java.lang.IllegalArgumentException: Failed to specify server's 
Kerberos principal name
        at 
org.apache.hadoop.security.SaslRpcClient.getServerPrincipal(SaslRpcClient.java:326)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.security.SaslRpcClient.createSaslClient(SaslRpcClient.java:231)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:159)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:391) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:617) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:423) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:829) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:825) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_342]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_342]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2012)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        ... 56 more{code}
The task did not fail, but hive failed to load data into the cache.

The hadoop version used in the test is 2.10.1. During the test, it was found 
that changing the hadoop version to 2.8.5 did not throw an exception. 

  was:
When I test kafka and hive lookup join, only the connection between tm and 
zookeeper is disconnected after the data is successfully loaded into the cache. 
In this case, the task will be restarted because the flink task restart policy 
is configured (note that the task process does not change at this time, and the 
task restarts in the same container).

The task sql is as follows:
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE kafka_table_1 (\n" +
"  a string,\n" +
"  b int,\n" +
"  c string,\n" +
"  log_ts TIMESTAMP(0),\n" +
") WITH (\n" +
"    'connector' = 'kafka',\n" +
"    'topic' = 'test',\n" +
"    'scan.startup.mode' = 'latest-offset',\n" +
"    'properties.bootstrap.servers' = '172.16.144.208:9092',\n" +
"    'format' = 'csv'\n" +
")");
String name = "hive_catalog";
String hiveConfDir = "/cloud/service/flink/conf";
HiveCatalog hive = new HiveCatalog(name, null, hiveConfDir);
hive.open();
tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("create table if not exists ttt(a string,b int,c string)");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("select * from 
default_catalog.default_database.kafka_table_1 as u join ttt for system_time as 
of u.proctime as o on u.a = o.a").print();{code}
After the task is successfully restarted, kafka produces data. When some data 
comes in, an error message is displayed indicating that loading data into the 
cache fails. The error message is as follows:
{code:java}
2022-11-17 14:09:16,041 INFO  
org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Populating 
lookup join cache
2022-11-17 14:09:16,051 INFO  org.apache.hadoop.conf.Configuration              
           [] - getProps 1230612033
2022-11-17 14:09:16,053 INFO  org.apache.hadoop.io.retry.RetryInvocationHandler 
           [] - java.io.IOException: 
org.apache.flink.shaded.hadoop2.com.google.protobuf.ServiceException: 
java.lang.IllegalStateException: Trying to access closed classloader. Please 
check if you store classloaders directly or indirectly in static fields. If the 
stacktrace suggests that the leak occurs in a third party library and cannot be 
fixed immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'., while invoking 
ClientNamenodeProtocolTranslatorPB.getFileInfo over 
hdp-hadoop-hdp-namenode-1.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.6.85:9000.
 Trying to failover immediately.
2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                      
           [] - The ping interval is 60000 ms.
2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                      
           [] - Connecting to 
hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000
2022-11-17 14:09:16,055 DEBUG org.apache.hadoop.security.UserGroupInformation   
           [] - PrivilegedAction 
as:hadoop/hdp-flinkhistory-hdp-flink-history-6754db4f9c-8b8nd.hdp-flinkhistory-hdp-flink-history.hdp-dev-env-3.svc.cluster.lo...@dahua.com
 (auth:KERBEROS) 
from:org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825)
2022-11-17 14:09:16,056 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - Sending sasl message state: NEGOTIATE

2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - Get token info proto:interface 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB 
info:@org.apache.hadoop.security.token.TokenInfo(value=class 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector)
2022-11-17 14:09:16,057 INFO  org.apache.hadoop.security.SaslRpcClient          
           [] - getServerPrincipal 1230612033
2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - Get kerberos info proto:interface 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB 
info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=, 
serverPrincipal=dfs.namenode.kerberos.principal)
2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.security.SaslRpcClient          
           [] - getting serverKey: dfs.namenode.kerberos.principal conf value: 
null principal: null
2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.ipc.Client                      
           [] - closing ipc connection to 
hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000:
 Couldn't set up IO streams: java.lang.IllegalArgumentException: Failed to 
specify server's Kerberos principal name
java.io.IOException: Couldn't set up IO streams: 
java.lang.IllegalArgumentException: Failed to specify server's Kerberos 
principal name
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:891) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection.access$3700(Client.java:423) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1615) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client.call(Client.java:1446) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client.call(Client.java:1399) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source) ~[?:?]
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:800)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_342]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_342]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_342]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at com.sun.proxy.$Proxy28.getFileInfo(Unknown Source) ~[?:?]
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1673) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1632) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:334)
 ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
        at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:318)
 ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
        at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:83)
 ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
        at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.lambda$eval$1(HdpFileSystemLookupFunction.java:59)
 ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:100)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.eval(HdpFileSystemLookupFunction.java:58)
 [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at LookupFunction$10.flatMap(Unknown Source) 
[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
        at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
 [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
 [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at StreamExecCalc$7.processElement(Unknown Source) 
[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
[flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
 [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
Caused by: java.lang.IllegalArgumentException: Failed to specify server's 
Kerberos principal name
        at 
org.apache.hadoop.security.SaslRpcClient.getServerPrincipal(SaslRpcClient.java:326)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.security.SaslRpcClient.createSaslClient(SaslRpcClient.java:231)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:159)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:391) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:617) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:423) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:829) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:825) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_342]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_342]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2012)
 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825) 
~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
        ... 56 more{code}
The task did not fail, but hive failed to load data into the cache.


> Failed to load data to the cache after the hive lookup join task is restarted
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-30075
>                 URL: https://issues.apache.org/jira/browse/FLINK-30075
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.12.2
>            Reporter: zhanglu153
>            Priority: Major
>
> When I test kafka and hive lookup join, only the connection between tm and 
> zookeeper is disconnected after the data is successfully loaded into the 
> cache. In this case, the task will be restarted because the flink task 
> restart policy is configured (note that the task process does not change at 
> this time, and the task restarts in the same container).
> The task sql is as follows:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
> tableEnv.executeSql("CREATE TABLE kafka_table_1 (\n" +
> "  a string,\n" +
> "  b int,\n" +
> "  c string,\n" +
> "  log_ts TIMESTAMP(0),\n" +
> ") WITH (\n" +
> "    'connector' = 'kafka',\n" +
> "    'topic' = 'test',\n" +
> "    'scan.startup.mode' = 'latest-offset',\n" +
> "    'properties.bootstrap.servers' = '172.16.144.208:9092',\n" +
> "    'format' = 'csv'\n" +
> ")");
> String name = "hive_catalog";
> String hiveConfDir = "/cloud/service/flink/conf";
> HiveCatalog hive = new HiveCatalog(name, null, hiveConfDir);
> hive.open();
> tableEnv.registerCatalog(name, hive);
> tableEnv.useCatalog(name);
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql("create table if not exists ttt(a string,b int,c 
> string)");
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> tableEnv.executeSql("select * from 
> default_catalog.default_database.kafka_table_1 as u join ttt for system_time 
> as of u.proctime as o on u.a = o.a").print();{code}
> After the task is successfully restarted, kafka produces data. When some data 
> comes in, an error message is displayed indicating that loading data into the 
> cache fails. The error message is as follows:
> {code:java}
> 2022-11-17 14:09:16,041 INFO  
> org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Populating 
> lookup join cache
> 2022-11-17 14:09:16,051 INFO  org.apache.hadoop.conf.Configuration            
>              [] - getProps 1230612033
> 2022-11-17 14:09:16,053 INFO  
> org.apache.hadoop.io.retry.RetryInvocationHandler            [] - 
> java.io.IOException: 
> org.apache.flink.shaded.hadoop2.com.google.protobuf.ServiceException: 
> java.lang.IllegalStateException: Trying to access closed classloader. Please 
> check if you store classloaders directly or indirectly in static fields. If 
> the stacktrace suggests that the leak occurs in a third party library and 
> cannot be fixed immediately, you can disable this check with the 
> configuration 'classloader.check-leaked-classloader'., while invoking 
> ClientNamenodeProtocolTranslatorPB.getFileInfo over 
> hdp-hadoop-hdp-namenode-1.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.6.85:9000.
>  Trying to failover immediately.
> 2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                    
>              [] - The ping interval is 60000 ms.
> 2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                    
>              [] - Connecting to 
> hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000
> 2022-11-17 14:09:16,055 DEBUG org.apache.hadoop.security.UserGroupInformation 
>              [] - PrivilegedAction 
> as:hadoop/hdp-flinkhistory-hdp-flink-history-6754db4f9c-8b8nd.hdp-flinkhistory-hdp-flink-history.hdp-dev-env-3.svc.cluster.lo...@dahua.com
>  (auth:KERBEROS) 
> from:org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825)
> 2022-11-17 14:09:16,056 DEBUG org.apache.hadoop.security.SaslRpcClient        
>              [] - Sending sasl message state: NEGOTIATE
> 2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient        
>              [] - Get token info proto:interface 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB 
> info:@org.apache.hadoop.security.token.TokenInfo(value=class 
> org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector)
> 2022-11-17 14:09:16,057 INFO  org.apache.hadoop.security.SaslRpcClient        
>              [] - getServerPrincipal 1230612033
> 2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient        
>              [] - Get kerberos info proto:interface 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB 
> info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=, 
> serverPrincipal=dfs.namenode.kerberos.principal)
> 2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.security.SaslRpcClient        
>              [] - getting serverKey: dfs.namenode.kerberos.principal conf 
> value: null principal: null
> 2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.ipc.Client                    
>              [] - closing ipc connection to 
> hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000:
>  Couldn't set up IO streams: java.lang.IllegalArgumentException: Failed to 
> specify server's Kerberos principal name
> java.io.IOException: Couldn't set up IO streams: 
> java.lang.IllegalArgumentException: Failed to specify server's Kerberos 
> principal name
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:891) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.ipc.Client$Connection.access$3700(Client.java:423) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.ipc.Client.getConnection(Client.java:1615) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.ipc.Client.call(Client.java:1446) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.ipc.Client.call(Client.java:1399) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source) ~[?:?]
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:800)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_342]
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_342]
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_342]
>       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at com.sun.proxy.$Proxy28.getFileInfo(Unknown Source) ~[?:?]
>       at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1673) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1632) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:334)
>  ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
>       at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:318)
>  ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
>       at 
> org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:83)
>  ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
>       at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
>  ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
>  ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.lambda$eval$1(HdpFileSystemLookupFunction.java:59)
>  ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:100)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.eval(HdpFileSystemLookupFunction.java:58)
>  [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at LookupFunction$10.flatMap(Unknown Source) 
> [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
>       at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
>  [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
>  [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at StreamExecCalc$7.processElement(Unknown Source) 
> [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>  [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>  [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>  [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>  [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>  [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> Caused by: java.lang.IllegalArgumentException: Failed to specify server's 
> Kerberos principal name
>       at 
> org.apache.hadoop.security.SaslRpcClient.getServerPrincipal(SaslRpcClient.java:326)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.security.SaslRpcClient.createSaslClient(SaslRpcClient.java:231)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:159)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:391) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:617) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:423) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:829) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:825) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_342]
>       at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_342]
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2012)
>  
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825) 
> ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
>       ... 56 more{code}
> The task did not fail, but hive failed to load data into the cache.
> The hadoop version used in the test is 2.10.1. During the test, it was found 
> that changing the hadoop version to 2.8.5 did not throw an exception. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to