Niels Basjes created FLINK-4311:
-----------------------------------

             Summary: TableInputFormat fails when reused on next split
                 Key: FLINK-4311
                 URL: https://issues.apache.org/jira/browse/FLINK-4311
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.0.3
            Reporter: Niels Basjes
            Assignee: Niels Basjes
            Priority: Critical


We have written a batch job that uses data from HBase by means of using the 
TableInputFormat.

We have found that this class sometimes fails with this exception:
{quote}
java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
Task 
org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
 rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
        at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
        at 
org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
        at 
org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
        at 
org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
        at 
org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
        at 
org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
        at 
org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
        at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException: Task 
org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
 rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at 
org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
        at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
        at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
        at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
        at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        ... 10 more
{quote}

As you can see the ThreadPoolExecutor was terminated at this point.

We tracked it down to the fact that 
# the configure method opens the table
# the open method obtains the result scanner
# the closes method closes the table.

If a second split arrives on the same instance then the open method will fail 
because the table has already been closed.

We also found that this error varies with the versions of HBase that are used. 
I have also seen this exception:
{quote}
Caused by: java.io.IOException: hconnection-0x19d37183 closed
        at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
        at 
org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
        ... 37 more
{quote}

I found that in the [documentation of the InputFormat 
interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
 is clearly states
{quote}IMPORTANT NOTE: Input formats must be written such that an instance can 
be opened again after it was closed. That is due to the fact that the input 
format is used for potentially multiple splits. After a split is done, the 
format's close function is invoked and, if another split is available, the open 
function is invoked afterwards for the next split.{quote}

It appears that this specific InputFormat has not been checked against this 
constraint.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to