[ 
https://issues.apache.org/jira/browse/GOBBLIN-1995?focusedWorklogId=902461&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-902461
 ]

ASF GitHub Bot logged work on GOBBLIN-1995:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Jan/24 02:18
            Start Date: 30/Jan/24 02:18
    Worklog Time Spent: 10m 
      Work Description: homatthew commented on code in PR #3871:
URL: https://github.com/apache/gobblin/pull/3871#discussion_r1470505601


##########
gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java:
##########
@@ -172,17 +172,23 @@ public DataWriter<D> load(final GenericRecord key)
                 new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() 
{
                   @Override
                   public DataWriter<D> get() {
+                    Future<DataWriter<D>> future = null;
                     try {
                       log.info(String.format("Adding one more writer to 
loading cache of existing writer "
                           + "with size = %d", partitionWriters.size()));
-                      Future<DataWriter<D>> future = 
createWriterPool.submit(() -> createPartitionWriter(key));
+                      future = createWriterPool.submit(() -> 
createPartitionWriter(key));
                       state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, 
partitionWriters.size() + 1);
                       return future.get(writeTimeoutInterval, 
TimeUnit.SECONDS);
                     } catch (ExecutionException | InterruptedException e) {
                       throw new RuntimeException("Error creating writer", e);
                     } catch (TimeoutException e) {
                       throw new RuntimeException(String.format("Failed to 
create writer due to timeout. The operation timed out after %s seconds.", 
writeTimeoutInterval), e);
                     }
+                    finally {
+                      if (future != null && !future.isDone()) {
+                        future.cancel(true);

Review Comment:
   Does this have the intended effect? In the issues you were seeing, did we 
see threads that are sleeping / waiting on IO, which need to be interrupted via 
cancel?
   
   I am asking this because if the thread needs to actively check if it's being 
requested to cancel (unless it's actively waiting / sleeping).
   
   Read the below to see what I am describing
   https://stackoverflow.com/questions/28043225/future-cancel-does-not-work





Issue Time Tracking
-------------------

    Worklog Id:     (was: 902461)
    Time Spent: 20m  (was: 10m)

> Kill the writer thread when timeout happens to release the lock  
> -----------------------------------------------------------------
>
>                 Key: GOBBLIN-1995
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1995
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Now when there is timeout happens talking with HDFS, we timeout and fail the 
> job but the thread won't be released in this case, and all other calls to 
> HDFS will be blocked by this hanging thread. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to