Repository: flink Updated Branches: refs/heads/master 9e17cbd6b -> a079259f3
[FLINK-4311] [hbase] Fix TableInputFormat to correctly process multiple input splits. This closes #2330 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f872792 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f872792 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f872792 Branch: refs/heads/master Commit: 3f8727921e944d1d89714f5885c2de63681d51b2 Parents: 9e17cbd Author: Niels Basjes <[email protected]> Authored: Wed Aug 3 14:54:34 2016 +0200 Committer: Fabian Hueske <[email protected]> Committed: Mon Oct 10 20:36:20 2016 +0200 ---------------------------------------------------------------------- flink-batch-connectors/flink-hbase/pom.xml | 84 ++++++- .../flink/addons/hbase/TableInputFormat.java | 171 +++++++------ .../hbase/HBaseTestingClusterAutostarter.java | 238 +++++++++++++++++++ .../addons/hbase/TableInputFormatITCase.java | 120 ++++++++++ .../src/test/resources/hbase-site.xml | 43 ---- .../src/test/resources/log4j-test.properties | 23 ++ .../src/test/resources/log4j.properties | 23 -- .../api/common/io/ReplicatingInputFormat.java | 4 +- .../flink/api/common/io/RichInputFormat.java | 8 +- .../source/ContinuousFileReaderOperator.java | 9 +- 10 files changed, 579 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/pom.xml b/flink-batch-connectors/flink-hbase/pom.xml index 10c20a0..62e77d6 100644 --- a/flink-batch-connectors/flink-hbase/pom.xml +++ b/flink-batch-connectors/flink-hbase/pom.xml @@ -35,9 +35,36 @@ under the License. <properties> <hbase.hadoop1.version>0.98.11-hadoop1</hbase.hadoop1.version> - <hbase.hadoop2.version>0.98.11-hadoop2</hbase.hadoop2.version> + <hbase.hadoop2.version>1.1.2</hbase.hadoop2.version> </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <!-- Disable inherited shade-flink because of a problem in the shade plugin --> + <!-- When enabled you'll run into an infinite loop creating the dependency-reduced-pom.xml --> + <!-- Seems similar to https://issues.apache.org/jira/browse/MSHADE-148 --> + <id>shade-flink</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> <!-- core dependencies --> @@ -48,26 +75,34 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> - + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> - + <dependency> <groupId>org.apache.flink</groupId> <artifactId>${shading-artifact.name}</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> - + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${project.version}</version> <scope>provided</scope> + + <!--Exclude Guava in order to run the HBaseMiniCluster during testing--> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> </dependency> <!-- HBase server needed for TableOutputFormat --> @@ -163,6 +198,45 @@ under the License. </exclusions> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> <profiles> @@ -178,7 +252,7 @@ under the License. <hbase.version>${hbase.hadoop1.version}</hbase.version> </properties> </profile> - + <profile> <id>hadoop-2</id> <repositories> http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java index 6ba6217..35b0a7c 100644 --- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -17,13 +17,9 @@ */ package org.apache.flink.addons.hbase; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; @@ -38,11 +34,14 @@ import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * {@link InputFormat} subclass that wraps the access for HTables. - * */ -public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit>{ +public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> { private static final long serialVersionUID = 1L; @@ -51,34 +50,56 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat< /** helper variable to decide whether the input is exhausted or not */ private boolean endReached = false; - // TODO table and scan could be serialized when kryo serializer will be the default - protected transient HTable table; - protected transient Scan scan; + protected transient HTable table = null; + protected transient Scan scan = null; /** HBase iterator wrapper */ - private ResultScanner rs; + private ResultScanner resultScanner = null; private byte[] lastRow; private int scannedRows; - // abstract methods allow for multiple table and scanners in the same job + /** + * Returns an instance of Scan that retrieves the required subset of records from the HBase table. + * @return The appropriate instance of Scan for this usecase. + */ protected abstract Scan getScanner(); + + /** + * What table is to be read. + * Per instance of a TableInputFormat derivative only a single tablename is possible. + * @return The name of the table + */ protected abstract String getTableName(); + + /** + * The output from HBase is always an instance of {@link Result}. + * This method is to copy the data in the Result instance into the required {@link Tuple} + * @param r The Result instance from HBase that needs to be converted + * @return The approriate instance of {@link Tuple} that contains the needed information. + */ protected abstract T mapResultToTuple(Result r); /** - * creates a {@link Scan} object and a {@link HTable} connection + * Creates a {@link Scan} object and opens the {@link HTable} connection. + * These are opened here because they are needed in the createInputSplits + * which is called before the openInputFormat method. + * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}. * - * @param parameters + * @param parameters The configuration that is to be used * @see Configuration */ @Override public void configure(Configuration parameters) { - this.table = createTable(); - this.scan = getScanner(); + table = createTable(); + if (table != null) { + scan = getScanner(); + } } - /** Create an {@link HTable} instance and set it into this format */ + /** + * Create an {@link HTable} instance and set it into this format + */ private HTable createTable() { LOG.info("Initializing HBaseConfiguration"); //use files found in the classpath @@ -93,32 +114,51 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat< } @Override + public void open(TableInputSplit split) throws IOException { + if (table == null) { + throw new IOException("The HBase table has not been opened!"); + } + if (scan == null) { + throw new IOException("getScanner returned null"); + } + if (split == null) { + throw new IOException("Input split is null!"); + } + + logSplitInfo("opening", split); + scan.setStartRow(split.getStartRow()); + lastRow = split.getEndRow(); + scan.setStopRow(lastRow); + + resultScanner = table.getScanner(scan); + endReached = false; + scannedRows = 0; + } + + @Override public boolean reachedEnd() throws IOException { - return this.endReached; + return endReached; } @Override public T nextRecord(T reuse) throws IOException { - if (this.rs == null){ + if (resultScanner == null) { throw new IOException("No table result scanner provided!"); } - try{ - Result res = this.rs.next(); - if (res != null){ + try { + Result res = resultScanner.next(); + if (res != null) { scannedRows++; lastRow = res.getRow(); return mapResultToTuple(res); } - }catch (Exception e) { - this.rs.close(); + } catch (Exception e) { + resultScanner.close(); //workaround for timeout on scan - StringBuffer logMsg = new StringBuffer("Error after scan of ") - .append(scannedRows) - .append(" rows. Retry with a new scanner..."); - LOG.warn(logMsg.toString(), e); - this.scan.setStartRow(lastRow); - this.rs = table.getScanner(scan); - Result res = this.rs.next(); + LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e); + scan.setStartRow(lastRow); + resultScanner = table.getScanner(scan); + Result res = resultScanner.next(); if (res != null) { scannedRows++; lastRow = res.getRow(); @@ -126,46 +166,43 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat< } } - this.endReached = true; + endReached = true; return null; } @Override - public void open(TableInputSplit split) throws IOException { - if (split == null){ - throw new IOException("Input split is null!"); - } - if (table == null){ - throw new IOException("No HTable provided!"); - } - if (scan == null){ - throw new IOException("No Scan instance provided"); + public void close() throws IOException { + LOG.info("Closing split (scanned {} rows)", scannedRows); + lastRow = null; + try { + if (resultScanner != null) { + resultScanner.close(); + } + } finally { + resultScanner = null; } - - logSplitInfo("opening", split); - scan.setStartRow(split.getStartRow()); - lastRow = split.getEndRow(); - scan.setStopRow(lastRow); - - this.rs = table.getScanner(scan); - this.endReached = false; - this.scannedRows = 0; } @Override - public void close() throws IOException { - if(rs!=null){ - this.rs.close(); - } - if(table!=null){ - this.table.close(); + public void closeInputFormat() throws IOException { + try { + if (table != null) { + table.close(); + } + } finally { + table = null; } - LOG.info("Closing split (scanned {} rows)", scannedRows); - this.lastRow = null; } @Override public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { + if (table == null) { + throw new IOException("The HBase table has not been opened!"); + } + if (scan == null) { + throw new IOException("getScanner returned null"); + } + //Gets the starting and ending row keys for every region in the currently open table final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { @@ -186,16 +223,16 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat< continue; } //Finds the region on which the given row is being served - final String[] hosts = new String[] { regionLocation }; + final String[] hosts = new String[]{regionLocation}; // determine if regions contains keys used by the scan boolean isLastRegion = endKey.length == 0; if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) && - (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { + (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow; final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) - && !isLastRegion ? endKey : stopRow; + && !isLastRegion ? endKey : stopRow; int id = splits.size(); final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop); splits.add(split); @@ -215,7 +252,7 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat< String splitStartKey = splitStart.isEmpty() ? "-" : splitStart; String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd; String[] hostnames = split.getHostnames(); - LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey); + LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey); } /** @@ -231,13 +268,11 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat< * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( * i.e. all regions are included). * - * @param startKey - * Start key of the region - * @param endKey - * End key of the region + * @param startKey Start key of the region + * @param endKey End key of the region * @return true, if this region needs to be included as part of the input (default). */ - private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { + protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { return true; } @@ -251,4 +286,4 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat< return null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java new file mode 100644 index 0000000..3d9f672 --- /dev/null +++ b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java @@ -0,0 +1,238 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * By using this class as the super class of a set of tests you will have a HBase testing + * cluster available that is very suitable for writing tests for scanning and filtering against. + * This is usable by any downstream application because the HBase cluster is 'injected' because + * a dynamically generated hbase-site.xml is added to the classpath. + * Because of this classpath manipulation it is not possible to start a second testing cluster in the same JVM. + * So if you have this you should either put all hbase related tests in a single class or force surefire to + * setup a new JVM for each testclass. + * See: http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html + */ +// +// NOTE: The code in this file is based on code from the +// Apache HBase project, licensed under the Apache License v 2.0 +// +// https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java +// +public class HBaseTestingClusterAutostarter implements Serializable { + + private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HBaseAdmin admin = null; + private static List<TableName> createdTables = new ArrayList<>(); + + private static boolean alreadyRegisteredTestCluster = false; + + protected static void createTable(TableName tableName, byte[] columnFamilyName, byte[][] splitKeys) { + LOG.info("HBase minicluster: Creating table " + tableName.getNameAsString()); + + assertNotNull("HBaseAdmin is not initialized successfully.", admin); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDef = new HColumnDescriptor(columnFamilyName); + desc.addFamily(colDef); + + try { + admin.createTable(desc, splitKeys); + createdTables.add(tableName); + assertTrue("Fail to create the table", admin.tableExists(tableName)); + } catch (IOException e) { + assertNull("Exception found while creating table", e); + } + } + + protected static HTable openTable(TableName tableName) throws IOException { + HTable table = (HTable) admin.getConnection().getTable(tableName); + assertTrue("Fail to create the table", admin.tableExists(tableName)); + return table; + } + + private static void deleteTables() { + if (admin != null) { + for (TableName tableName : createdTables) { + try { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + } catch (IOException e) { + assertNull("Exception found deleting the table", e); + } + } + } + } + + private static void initialize(Configuration conf) { + conf = HBaseConfiguration.create(conf); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + try { + admin = TEST_UTIL.getHBaseAdmin(); + } catch (MasterNotRunningException e) { + assertNull("Master is not running", e); + } catch (ZooKeeperConnectionException e) { + assertNull("Cannot connect to ZooKeeper", e); + } catch (IOException e) { + assertNull("IOException", e); + } + } + + @BeforeClass + public static void setUp() throws Exception { + LOG.info("HBase minicluster: Starting"); + ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + + TEST_UTIL.startMiniCluster(1); + + // https://issues.apache.org/jira/browse/HBASE-11711 + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1); + + // Make sure the zookeeper quorum value contains the right port number (varies per run). + TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort()); + + initialize(TEST_UTIL.getConfiguration()); + LOG.info("HBase minicluster: Running"); + } + + private static File hbaseSiteXmlDirectory; + private static File hbaseSiteXmlFile; + + /** + * This dynamically generates a hbase-site.xml file that is added to the classpath. + * This way this HBaseMinicluster can be used by an unmodified application. + * The downside is that this cannot be 'unloaded' so you can have only one per JVM. + */ + public static void registerHBaseMiniClusterInClasspath() { + if (alreadyRegisteredTestCluster) { + fail("You CANNOT register a second HBase Testing cluster in the classpath of the SAME JVM"); + } + File baseDir = new File(System.getProperty("java.io.tmpdir", "/tmp/")); + hbaseSiteXmlDirectory = new File(baseDir, "unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/"); + + if (!hbaseSiteXmlDirectory.mkdirs()) { + fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster"); + } + + assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster()); + + createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum")); + addDirectoryToClassPath(hbaseSiteXmlDirectory); + + // Avoid starting it again. + alreadyRegisteredTestCluster = true; + } + + private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) { + hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml"); + // Create the hbase-site.xml file for this run. + try { + String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" + + "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" + + "<configuration>\n" + + " <property>\n" + + " <name>hbase.zookeeper.quorum</name>\n" + + " <value>" + zookeeperQuorum + "</value>\n" + + " </property>\n" + + "</configuration>"; + OutputStream fos = new FileOutputStream(hbaseSiteXmlFile); + fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8)); + fos.close(); + } catch (IOException e) { + fail("Unable to create " + hbaseSiteXmlFile); + } + } + + private static void addDirectoryToClassPath(File directory) { + try { + // Get the classloader actually used by HBaseConfiguration + ClassLoader classLoader = HBaseConfiguration.create().getClassLoader(); + if (!(classLoader instanceof URLClassLoader)) { + fail("We should get a URLClassLoader"); + } + + // Make the addURL method accessible + Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); + method.setAccessible(true); + + // Add the directory where we put the hbase-site.xml to the classpath + method.invoke(classLoader, directory.toURI().toURL()); + } catch (MalformedURLException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + fail("Unable to add " + directory + " to classpath because of this exception: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDown() throws Exception { + LOG.info("HBase minicluster: Shutting down"); + deleteTables(); + hbaseSiteXmlFile.delete(); + hbaseSiteXmlDirectory.delete(); + TEST_UTIL.shutdownMiniCluster(); + LOG.info("HBase minicluster: Down"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java new file mode 100644 index 0000000..3dddd88 --- /dev/null +++ b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TableInputFormatITCase extends HBaseTestingClusterAutostarter { + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable"; + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes(); + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes(); + + // These are the row ids AND also the values we will put in the test table + private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"}; + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Before + public void createTestTable() throws IOException { + TableName tableName = TableName.valueOf(TEST_TABLE_NAME); + byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()}; + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys); + HTable table = openTable(tableName); + + for (String rowId : ROW_IDS) { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + } + + table.close(); + } + + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> { + @Override + protected Scan getScanner() { + return new Scan(); + } + + @Override + protected String getTableName() { + return TEST_TABLE_NAME; + } + + @Override + protected Tuple1<String> mapResultToTuple(Result r) { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + } + } + + @Test + public void testTableInputFormat() { + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + environment.setParallelism(1); + + DataSet<String> resultDataSet = + environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() { + @Override + public String map(Tuple1<String> value) throws Exception { + return value.f0; + } + }); + + List<String> resultSet = new ArrayList<>(); + resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet)); + + try { + environment.execute("HBase InputFormat Test"); + } catch (Exception e) { + Assert.fail("HBase InputFormat test failed. " + e.getMessage()); + } + + for (String rowId : ROW_IDS) { + assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId)); + } + + assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml b/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml deleted file mode 100644 index 2984063..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> -<!-- -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ ---> -<configuration> - - <property> - <name>hbase.tmp.dir</name> - <!-- - <value>/media/Dati/hbase-0.98-data</value> - --> - <value>/opt/hbase-0.98.6.1-hadoop2/data</value> - - </property> - <property> - <name>hbase.zookeeper.quorum</name> - <value>localhost</value> - </property> - <!-- - <property> - <name>hadoop.security.group.mapping</name> - <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value> - </property> - --> -</configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..804ff45 --- /dev/null +++ b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger=DEBUG, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.threshold=INFO +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties b/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties deleted file mode 100644 index d6eb2b2..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -log4j.rootLogger=${hadoop.root.logger} -hadoop.root.logger=INFO,console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java index 14dc8f4..c8aa591 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java @@ -135,14 +135,14 @@ public final class ReplicatingInputFormat<OT, S extends InputSplit> extends Rich } @Override - public void openInputFormat() { + public void openInputFormat() throws IOException { if (this.replicatedIF instanceof RichInputFormat) { ((RichInputFormat)this.replicatedIF).openInputFormat(); } } @Override - public void closeInputFormat() { + public void closeInputFormat() throws IOException { if (this.replicatedIF instanceof RichInputFormat) { ((RichInputFormat)this.replicatedIF).closeInputFormat(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java index 6ab5cc1..95dc1cd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java @@ -23,6 +23,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.core.io.InputSplit; +import java.io.IOException; + /** * An abstract stub implementation for Rich input formats. * Rich formats have access to their runtime execution context via {@link #getRuntimeContext()}. @@ -56,9 +58,10 @@ public abstract class RichInputFormat<OT, T extends InputSplit> implements Input * Resources should be allocated in this method. (e.g. database connections, cache, etc.) * * @see InputFormat + * @throws IOException in case allocating the resources failed. */ @PublicEvolving - public void openInputFormat() { + public void openInputFormat() throws IOException { //do nothing here, just for subclasses } @@ -67,9 +70,10 @@ public abstract class RichInputFormat<OT, T extends InputSplit> implements Input * Resources allocated during {@link #openInputFormat()} should be closed in this method. * * @see InputFormat + * @throws IOException in case closing the resources failed */ @PublicEvolving - public void closeInputFormat() { + public void closeInputFormat() throws IOException { //do nothing here, just for subclasses } } http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 35e72a7..769cb6f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -326,12 +326,19 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } } catch (Throwable e) { + getContainingTask().handleAsyncException("Caught exception when processing split: " + currentSplit, e); + } finally { synchronized (checkpointLock) { LOG.info("Reader terminated, and exiting..."); - this.format.closeInputFormat(); + try { + this.format.closeInputFormat(); + } catch (IOException e) { + getContainingTask().handleAsyncException( + "Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e.getMessage(), e); + } this.isSplitOpen = false; this.currentSplit = null; this.isRunning = false;
