[ 
https://issues.apache.org/jira/browse/FLINK-21891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Re0 Beatrice updated FLINK-21891:
---------------------------------
    Description: 
In flink 1.12.0, use Blink Planner to read data from Hbase and write the 
results to Hive via Flink SQL.

The .staging_xxx files on HDFS:
 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616074732697
 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616120408195
 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121007337
 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121607484
 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616206808142

I found the following code in 
`org.apache.flink.table.filesystem.FileSystemOutputFormat` caused the problem:
{code:java}
import java.io.File;

@Override
public void finalizeGlobal(int parallelism) {
    try {        
        FileSystemCommitter committer = new FileSystemCommitter(  
          fsFactory, msFactory, overwrite, tmpPath, partitionColumns.length);   
        
        committer.commitUpToCheckpoint(CHECKPOINT_ID);    
    } catch (Exception e) {        
        throw new TableException("Exception in finalizeGlobal", e);    
    } finally {        
        new File(tmpPath.getPath()).delete();   // the error code    
    }
}

{code}
The code in finally code block `new File(..)` can't convert `tmpPath` to HDFS 
file instance, I think the following code is more correct and works for me:
{code:java}
fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);

{code}
A similar code has appeared in the class of PartitionTempFileManager.
  

  was:
In flink 1.12.0, use Blink Planner to read data from Hbase and write the 
results to Hive via Flink SQL.

The .staging_xxx files on HDFS:
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616074732697
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616120408195
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121007337
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121607484
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616206808142

I found the following code in 
`org.apache.flink.table.filesystem.FileSystemOutputFormat` caused the problem:

{code:java}

import java.io.File;

@Override
public void finalizeGlobal(int parallelism) {
    try {        
        FileSystemCommitter committer = new FileSystemCommitter(  
          fsFactory, msFactory, overwrite, tmpPath, partitionColumns.length);   
        
        committer.commitUpToCheckpoint(CHECKPOINT_ID);    
    } catch (Exception e) {        
        throw new TableException("Exception in finalizeGlobal", e);    
    } finally {        
        new File(tmpPath.getPath()).delete();   // the error code    
    }
}

{code}
The code in finally code block `new File(..)` can't convert `tmpPath` to HDFS 
file instance, I think the following code is more correct:
{code:java}

fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);

{code}
A similar code has appeared in the class of PartitionTempFileManager.
 


> The .staging_xxx directory isn't deleted after writing data to hive table in 
> batch mode
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-21891
>                 URL: https://issues.apache.org/jira/browse/FLINK-21891
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.0
>            Reporter: Re0 Beatrice
>            Priority: Major
>
> In flink 1.12.0, use Blink Planner to read data from Hbase and write the 
> results to Hive via Flink SQL.
> The .staging_xxx files on HDFS:
>  /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616074732697
>  /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616120408195
>  /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121007337
>  /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121607484
>  /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616206808142
> I found the following code in 
> `org.apache.flink.table.filesystem.FileSystemOutputFormat` caused the problem:
> {code:java}
> import java.io.File;
> @Override
> public void finalizeGlobal(int parallelism) {
>     try {        
>         FileSystemCommitter committer = new FileSystemCommitter(  
>           fsFactory, msFactory, overwrite, tmpPath, partitionColumns.length); 
>           
>         committer.commitUpToCheckpoint(CHECKPOINT_ID);    
>     } catch (Exception e) {        
>         throw new TableException("Exception in finalizeGlobal", e);    
>     } finally {        
>         new File(tmpPath.getPath()).delete();   // the error code    
>     }
> }
> {code}
> The code in finally code block `new File(..)` can't convert `tmpPath` to HDFS 
> file instance, I think the following code is more correct and works for me:
> {code:java}
> fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
> {code}
> A similar code has appeared in the class of PartitionTempFileManager.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to