[ 
https://issues.apache.org/jira/browse/FLINK-18056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-18056:
----------------------------
    Component/s: Connectors / Hive

> Hive file sink throws exception when the target in-progress file exists.
> ------------------------------------------------------------------------
>
>                 Key: FLINK-18056
>                 URL: https://issues.apache.org/jira/browse/FLINK-18056
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem, Connectors / Hive
>            Reporter: Yun Gao
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> Currently after failover or restart, the Hive file sink will try to overwrite 
> the data since the last checkpoint, however, currently neither the 
> in-progress file is deleted nor hive uses the overwritten mode, thus an 
> exception occurs after restarting:
> {code:java}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  failed to create file 
> /user/hive/warehouse/datalake/dt=2020-06-01/hr=22/.part-0-10.inprogress for 
> DFSClient_NONMAPREDUCE_-1017064593_62 for client 100.96.206.42 because 
> current leaseholder is trying to recreate file.
> {code}
> The full stack of the exception is
> {code:java}
> org.apache.flink.connectors.hive.FlinkHiveException: 
> org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
> Hive RecordWriter
>     at 
> org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159)
>     at 
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
>     at 
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:234)
>     at 
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:207)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:284)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>     at 
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>     at StreamExecCalc$16.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>     at 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>     at StreamExecCalc$2.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>     at 
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed 
> to create Hive RecordWriter
>     at 
> org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58)
>     at 
> org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151)
>     ... 36 more
> Caused by: java.lang.reflect.InvocationTargetException
>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55)
>     ... 37 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  failed to create file 
> /user/hive/warehouse/datalake/dt=2020-06-01/hr=22/.part-0-10.inprogress for 
> DFSClient_NONMAPREDUCE_-1017064593_62 for client 100.96.206.42 because 
> current leaseholder is trying to recreate file.
>     at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3075)
>     at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2783)
>     at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2676)
>     at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2561)
>     at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:593)
>     at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:111)
>     at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:393)
>     at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>     at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
>     at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>     at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>     at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>     at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>     at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>     at com.sun.proxy.$Proxy35.create(Unknown Source)
>     at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:297)
>     at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>     at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>     at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>     at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>     at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>     at com.sun.proxy.$Proxy36.create(Unknown Source)
>     at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:265)
>     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1274)
>     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216)
>     at 
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473)
>     at 
> org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470)
>     at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>     at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470)
>     at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:176)
>     at parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:160)
>     at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:289)
>     at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267)
>     at 
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:66)
>     at 
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
>     at 
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
>     at 
> org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261)
>     ... 41 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to