[ 
https://issues.apache.org/jira/browse/GOBBLIN-1995?focusedWorklogId=910410&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-910410
 ]

ASF GitHub Bot logged work on GOBBLIN-1995:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Mar/24 21:36
            Start Date: 18/Mar/24 21:36
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3871:
URL: https://github.com/apache/gobblin/pull/3871#discussion_r1529316744


##########
gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java:
##########
@@ -172,17 +172,23 @@ public DataWriter<D> load(final GenericRecord key)
                 new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() 
{
                   @Override
                   public DataWriter<D> get() {
+                    Future<DataWriter<D>> future = null;
                     try {
                       log.info(String.format("Adding one more writer to 
loading cache of existing writer "
                           + "with size = %d", partitionWriters.size()));
-                      Future<DataWriter<D>> future = 
createWriterPool.submit(() -> createPartitionWriter(key));
+                      future = createWriterPool.submit(() -> 
createPartitionWriter(key));
                       state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, 
partitionWriters.size() + 1);
                       return future.get(writeTimeoutInterval, 
TimeUnit.SECONDS);
                     } catch (ExecutionException | InterruptedException e) {
                       throw new RuntimeException("Error creating writer", e);
                     } catch (TimeoutException e) {
                       throw new RuntimeException(String.format("Failed to 
create writer due to timeout. The operation timed out after %s seconds.", 
writeTimeoutInterval), e);
                     }
+                    finally {
+                      if (future != null && !future.isDone()) {
+                        future.cancel(true);

Review Comment:
   @homatthew thanks for the information. The thread here is blocking on 
getting the HDFS mount table which is an IO operation. I also did a test to do 
a for loop to talk with HDFS and do timeout, and make sure cancel can work 
correctly in this case. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 910410)
    Time Spent: 0.5h  (was: 20m)

> Kill the writer thread when timeout happens to release the lock  
> -----------------------------------------------------------------
>
>                 Key: GOBBLIN-1995
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1995
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Now when there is timeout happens talking with HDFS, we timeout and fail the 
> job but the thread won't be released in this case, and all other calls to 
> HDFS will be blocked by this hanging thread. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to