[
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15509368#comment-15509368
]
ASF GitHub Bot commented on FLINK-4311:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2330#discussion_r79791682
--- Diff:
flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java
---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestTableInputFormatITCase extends
HBaseTestingClusterAutostarter {
+ private static final String TEST_TABLE_NAME =
"TableInputFormatTestTable";
+ private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+ private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+ // These are the row ids AND also the values we will put in the test
table
+ private static final String[] ROW_IDS = {"000", "111", "222", "333",
"444", "555", "666", "777", "888", "999"};
+
+ @Before
+ public void createTestTable() throws IOException {
+ TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+ byte[][] splitKeys = {"0".getBytes(), "3".getBytes(),
"6".getBytes(), "9".getBytes()};
+ createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+ HTable table = openTable(tableName);
+
+ for (String rowId : ROW_IDS) {
+ byte[] rowIdBytes = rowId.getBytes();
+ Put p = new Put(rowIdBytes);
+ // Use the rowId as the value to facilitate the testing
better
+ p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME,
rowIdBytes);
+ table.put(p);
+ }
+
+ table.close();
+ }
+
+ class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
+ @Override
+ protected Scan getScanner() {
+ return new Scan();
+ }
+
+ @Override
+ protected String getTableName() {
+ return TEST_TABLE_NAME;
+ }
+
+ @Override
+ protected Tuple1<String> mapResultToTuple(Result r) {
+ return new Tuple1<>(new
String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+ }
+ }
+
+ @Test
+ public void testTableInputFormat() {
+ ExecutionEnvironment environment =
ExecutionEnvironment.getExecutionEnvironment();
+ environment.setParallelism(1);
+
+ DataSet<String> resultDataSet =
+ environment.createInput(new
InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
+ @Override
+ public String map(Tuple1<String> value) throws
Exception {
+ return value.f0;
+ }
+ });
+
+ List<String> resultSet = new ArrayList<>();
+ resultDataSet.output(new
LocalCollectionOutputFormat<>(resultSet));
+
+ try {
+ environment.execute("HBase InputFormat Test");
+ } catch (Exception e) {
+ Assert.fail("HBase InputFormat test failed. " +
e.getMessage());
+ }
+
+ for (String rowId : ROW_IDS) {
--- End diff --
Please add a check that `ROW_IDS` and `resultSet` have the same size to
ensure that each record is read exactly once.
> TableInputFormat fails when reused on next split
> ------------------------------------------------
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.0.3
> Reporter: Niels Basjes
> Assignee: Niels Basjes
> Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException:
> Task
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
> rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
> at
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
> at
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
> at
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
> at
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
> at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
> at
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
> at
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
> rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> at
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> at
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
> ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
> at
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
> ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
> is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance
> can be opened again after it was closed. That is due to the fact that the
> input format is used for potentially multiple splits. After a split is done,
> the format's close function is invoked and, if another split is available,
> the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not been checked against this
> constraint.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)