[ https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15504565#comment-15504565 ]
ASF GitHub Bot commented on FLINK-4311: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79472506 --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestTableInputFormat extends HBaseTestingClusterAutostarter { + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable"; + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes(); + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes(); + + // These are the row ids AND also the values we will put in the test table + private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"}; + + @Before + public void createTestTable() throws IOException { + TableName tableName = TableName.valueOf(TEST_TABLE_NAME); + byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()}; + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys); + HTable table = openTable(tableName); + + for (String rowId : ROW_IDS) { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + } + + table.close(); + } + + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> { + @Override + protected Scan getScanner() { + return new Scan(); + } + + @Override + protected String getTableName() { + return TEST_TABLE_NAME; + } + + @Override + protected Tuple1<String> mapResultToTuple(Result r) { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + } + } + + @Test + public void testTableInputFormat() { --- End diff -- Can we make this test a bit more lightweight and not execute a Flink program? Instead we could test the interface methods of the InputFormat such as: - createInputSplits - configure - open - nextRecord - close etc. if you split the test into several methods, please make sure that HBase is only initalized once with `@BeforeClass`. > TableInputFormat fails when reused on next split > ------------------------------------------------ > > Key: FLINK-4311 > URL: https://issues.apache.org/jira/browse/FLINK-4311 > Project: Flink > Issue Type: Bug > Affects Versions: 1.0.3 > Reporter: Niels Basjes > Assignee: Niels Basjes > Priority: Critical > > We have written a batch job that uses data from HBase by means of using the > TableInputFormat. > We have found that this class sometimes fails with this exception: > {quote} > java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: > Task > org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b > rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, > pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165] > at > org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208) > at > org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) > at > org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295) > at > org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160) > at > org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155) > at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821) > at > org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152) > at > org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.RejectedExecutionException: Task > org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b > rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, > pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165] > at > java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) > at > java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) > at > java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) > at > org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142) > at > org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269) > at > org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165) > at > org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59) > at > org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) > ... 10 more > {quote} > As you can see the ThreadPoolExecutor was terminated at this point. > We tracked it down to the fact that > # the configure method opens the table > # the open method obtains the result scanner > # the closes method closes the table. > If a second split arrives on the same instance then the open method will fail > because the table has already been closed. > We also found that this error varies with the versions of HBase that are > used. I have also seen this exception: > {quote} > Caused by: java.io.IOException: hconnection-0x19d37183 closed > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146) > at > org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300) > ... 37 more > {quote} > I found that in the [documentation of the InputFormat > interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html] > is clearly states > {quote}IMPORTANT NOTE: Input formats must be written such that an instance > can be opened again after it was closed. That is due to the fact that the > input format is used for potentially multiple splits. After a split is done, > the format's close function is invoked and, if another split is available, > the open function is invoked afterwards for the next split.{quote} > It appears that this specific InputFormat has not been checked against this > constraint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)