http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java deleted file mode 100644 index 35b0a7c..0000000 --- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.addons.hbase; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * {@link InputFormat} subclass that wraps the access for HTables. - */ -public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class); - - /** helper variable to decide whether the input is exhausted or not */ - private boolean endReached = false; - - protected transient HTable table = null; - protected transient Scan scan = null; - - /** HBase iterator wrapper */ - private ResultScanner resultScanner = null; - - private byte[] lastRow; - private int scannedRows; - - /** - * Returns an instance of Scan that retrieves the required subset of records from the HBase table. - * @return The appropriate instance of Scan for this usecase. - */ - protected abstract Scan getScanner(); - - /** - * What table is to be read. - * Per instance of a TableInputFormat derivative only a single tablename is possible. - * @return The name of the table - */ - protected abstract String getTableName(); - - /** - * The output from HBase is always an instance of {@link Result}. - * This method is to copy the data in the Result instance into the required {@link Tuple} - * @param r The Result instance from HBase that needs to be converted - * @return The approriate instance of {@link Tuple} that contains the needed information. - */ - protected abstract T mapResultToTuple(Result r); - - /** - * Creates a {@link Scan} object and opens the {@link HTable} connection. - * These are opened here because they are needed in the createInputSplits - * which is called before the openInputFormat method. - * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}. - * - * @param parameters The configuration that is to be used - * @see Configuration - */ - @Override - public void configure(Configuration parameters) { - table = createTable(); - if (table != null) { - scan = getScanner(); - } - } - - /** - * Create an {@link HTable} instance and set it into this format - */ - private HTable createTable() { - LOG.info("Initializing HBaseConfiguration"); - //use files found in the classpath - org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); - - try { - return new HTable(hConf, getTableName()); - } catch (Exception e) { - LOG.error("Error instantiating a new HTable instance", e); - } - return null; - } - - @Override - public void open(TableInputSplit split) throws IOException { - if (table == null) { - throw new IOException("The HBase table has not been opened!"); - } - if (scan == null) { - throw new IOException("getScanner returned null"); - } - if (split == null) { - throw new IOException("Input split is null!"); - } - - logSplitInfo("opening", split); - scan.setStartRow(split.getStartRow()); - lastRow = split.getEndRow(); - scan.setStopRow(lastRow); - - resultScanner = table.getScanner(scan); - endReached = false; - scannedRows = 0; - } - - @Override - public boolean reachedEnd() throws IOException { - return endReached; - } - - @Override - public T nextRecord(T reuse) throws IOException { - if (resultScanner == null) { - throw new IOException("No table result scanner provided!"); - } - try { - Result res = resultScanner.next(); - if (res != null) { - scannedRows++; - lastRow = res.getRow(); - return mapResultToTuple(res); - } - } catch (Exception e) { - resultScanner.close(); - //workaround for timeout on scan - LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e); - scan.setStartRow(lastRow); - resultScanner = table.getScanner(scan); - Result res = resultScanner.next(); - if (res != null) { - scannedRows++; - lastRow = res.getRow(); - return mapResultToTuple(res); - } - } - - endReached = true; - return null; - } - - @Override - public void close() throws IOException { - LOG.info("Closing split (scanned {} rows)", scannedRows); - lastRow = null; - try { - if (resultScanner != null) { - resultScanner.close(); - } - } finally { - resultScanner = null; - } - } - - @Override - public void closeInputFormat() throws IOException { - try { - if (table != null) { - table.close(); - } - } finally { - table = null; - } - } - - @Override - public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { - if (table == null) { - throw new IOException("The HBase table has not been opened!"); - } - if (scan == null) { - throw new IOException("getScanner returned null"); - } - - //Gets the starting and ending row keys for every region in the currently open table - final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - throw new IOException("Expecting at least one region."); - } - final byte[] startRow = scan.getStartRow(); - final byte[] stopRow = scan.getStopRow(); - final boolean scanWithNoLowerBound = startRow.length == 0; - final boolean scanWithNoUpperBound = stopRow.length == 0; - - final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits); - for (int i = 0; i < keys.getFirst().length; i++) { - final byte[] startKey = keys.getFirst()[i]; - final byte[] endKey = keys.getSecond()[i]; - final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort(); - //Test if the given region is to be included in the InputSplit while splitting the regions of a table - if (!includeRegionInSplit(startKey, endKey)) { - continue; - } - //Finds the region on which the given row is being served - final String[] hosts = new String[]{regionLocation}; - - // determine if regions contains keys used by the scan - boolean isLastRegion = endKey.length == 0; - if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) && - (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { - - final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow; - final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) - && !isLastRegion ? endKey : stopRow; - int id = splits.size(); - final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop); - splits.add(split); - } - } - LOG.info("Created " + splits.size() + " splits"); - for (TableInputSplit split : splits) { - logSplitInfo("created", split); - } - return splits.toArray(new TableInputSplit[0]); - } - - private void logSplitInfo(String action, TableInputSplit split) { - int splitId = split.getSplitNumber(); - String splitStart = Bytes.toString(split.getStartRow()); - String splitEnd = Bytes.toString(split.getEndRow()); - String splitStartKey = splitStart.isEmpty() ? "-" : splitStart; - String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd; - String[] hostnames = split.getHostnames(); - LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey); - } - - /** - * Test if the given region is to be included in the InputSplit while splitting the regions of a table. - * <p> - * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, - * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> - * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R - * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due - * to the ordering of the keys. <br> - * <br> - * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br> - * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( - * i.e. all regions are included). - * - * @param startKey Start key of the region - * @param endKey End key of the region - * @return true, if this region needs to be included as part of the input (default). - */ - protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { - return true; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) { - return new LocatableInputSplitAssigner(inputSplits); - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { - return null; - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java deleted file mode 100644 index 75f0b9b..0000000 --- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase; - -import org.apache.flink.core.io.LocatableInputSplit; - -/** - * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All - * references to row below refer to the key of the row. - */ -public class TableInputSplit extends LocatableInputSplit { - - private static final long serialVersionUID = 1L; - - /** The name of the table to retrieve data from */ - private final byte[] tableName; - - /** The start row of the split. */ - private final byte[] startRow; - - /** The end row of the split. */ - private final byte[] endRow; - - /** - * Creates a new table input split - * - * @param splitNumber - * the number of the input split - * @param hostnames - * the names of the hosts storing the data the input split refers to - * @param tableName - * the name of the table to retrieve data from - * @param startRow - * the start row of the split - * @param endRow - * the end row of the split - */ - TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow, - final byte[] endRow) { - super(splitNumber, hostnames); - - this.tableName = tableName; - this.startRow = startRow; - this.endRow = endRow; - } - - /** - * Returns the table name. - * - * @return The table name. - */ - public byte[] getTableName() { - return this.tableName; - } - - /** - * Returns the start row. - * - * @return The start row. - */ - public byte[] getStartRow() { - return this.startRow; - } - - /** - * Returns the end row. - * - * @return The end row. - */ - public byte[] getEndRow() { - return this.endRow; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java deleted file mode 100644 index 3d9f672..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.ipc.AbstractRpcClient; -import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.log4j.Level; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * By using this class as the super class of a set of tests you will have a HBase testing - * cluster available that is very suitable for writing tests for scanning and filtering against. - * This is usable by any downstream application because the HBase cluster is 'injected' because - * a dynamically generated hbase-site.xml is added to the classpath. - * Because of this classpath manipulation it is not possible to start a second testing cluster in the same JVM. - * So if you have this you should either put all hbase related tests in a single class or force surefire to - * setup a new JVM for each testclass. - * See: http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html - */ -// -// NOTE: The code in this file is based on code from the -// Apache HBase project, licensed under the Apache License v 2.0 -// -// https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java -// -public class HBaseTestingClusterAutostarter implements Serializable { - - private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class); - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static HBaseAdmin admin = null; - private static List<TableName> createdTables = new ArrayList<>(); - - private static boolean alreadyRegisteredTestCluster = false; - - protected static void createTable(TableName tableName, byte[] columnFamilyName, byte[][] splitKeys) { - LOG.info("HBase minicluster: Creating table " + tableName.getNameAsString()); - - assertNotNull("HBaseAdmin is not initialized successfully.", admin); - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor colDef = new HColumnDescriptor(columnFamilyName); - desc.addFamily(colDef); - - try { - admin.createTable(desc, splitKeys); - createdTables.add(tableName); - assertTrue("Fail to create the table", admin.tableExists(tableName)); - } catch (IOException e) { - assertNull("Exception found while creating table", e); - } - } - - protected static HTable openTable(TableName tableName) throws IOException { - HTable table = (HTable) admin.getConnection().getTable(tableName); - assertTrue("Fail to create the table", admin.tableExists(tableName)); - return table; - } - - private static void deleteTables() { - if (admin != null) { - for (TableName tableName : createdTables) { - try { - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - } catch (IOException e) { - assertNull("Exception found deleting the table", e); - } - } - } - } - - private static void initialize(Configuration conf) { - conf = HBaseConfiguration.create(conf); - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - try { - admin = TEST_UTIL.getHBaseAdmin(); - } catch (MasterNotRunningException e) { - assertNull("Master is not running", e); - } catch (ZooKeeperConnectionException e) { - assertNull("Cannot connect to ZooKeeper", e); - } catch (IOException e) { - assertNull("IOException", e); - } - } - - @BeforeClass - public static void setUp() throws Exception { - LOG.info("HBase minicluster: Starting"); - ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger) AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL); - - TEST_UTIL.startMiniCluster(1); - - // https://issues.apache.org/jira/browse/HBASE-11711 - TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1); - - // Make sure the zookeeper quorum value contains the right port number (varies per run). - TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort()); - - initialize(TEST_UTIL.getConfiguration()); - LOG.info("HBase minicluster: Running"); - } - - private static File hbaseSiteXmlDirectory; - private static File hbaseSiteXmlFile; - - /** - * This dynamically generates a hbase-site.xml file that is added to the classpath. - * This way this HBaseMinicluster can be used by an unmodified application. - * The downside is that this cannot be 'unloaded' so you can have only one per JVM. - */ - public static void registerHBaseMiniClusterInClasspath() { - if (alreadyRegisteredTestCluster) { - fail("You CANNOT register a second HBase Testing cluster in the classpath of the SAME JVM"); - } - File baseDir = new File(System.getProperty("java.io.tmpdir", "/tmp/")); - hbaseSiteXmlDirectory = new File(baseDir, "unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/"); - - if (!hbaseSiteXmlDirectory.mkdirs()) { - fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster"); - } - - assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster()); - - createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum")); - addDirectoryToClassPath(hbaseSiteXmlDirectory); - - // Avoid starting it again. - alreadyRegisteredTestCluster = true; - } - - private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) { - hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml"); - // Create the hbase-site.xml file for this run. - try { - String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" + - "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" + - "<configuration>\n" + - " <property>\n" + - " <name>hbase.zookeeper.quorum</name>\n" + - " <value>" + zookeeperQuorum + "</value>\n" + - " </property>\n" + - "</configuration>"; - OutputStream fos = new FileOutputStream(hbaseSiteXmlFile); - fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8)); - fos.close(); - } catch (IOException e) { - fail("Unable to create " + hbaseSiteXmlFile); - } - } - - private static void addDirectoryToClassPath(File directory) { - try { - // Get the classloader actually used by HBaseConfiguration - ClassLoader classLoader = HBaseConfiguration.create().getClassLoader(); - if (!(classLoader instanceof URLClassLoader)) { - fail("We should get a URLClassLoader"); - } - - // Make the addURL method accessible - Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); - method.setAccessible(true); - - // Add the directory where we put the hbase-site.xml to the classpath - method.invoke(classLoader, directory.toURI().toURL()); - } catch (MalformedURLException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - fail("Unable to add " + directory + " to classpath because of this exception: " + e.getMessage()); - } - } - - @AfterClass - public static void tearDown() throws Exception { - LOG.info("HBase minicluster: Shutting down"); - deleteTables(); - hbaseSiteXmlFile.delete(); - hbaseSiteXmlDirectory.delete(); - TEST_UTIL.shutdownMiniCluster(); - LOG.info("HBase minicluster: Down"); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java deleted file mode 100644 index 3dddd88..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TableInputFormatITCase extends HBaseTestingClusterAutostarter { - private static final String TEST_TABLE_NAME = "TableInputFormatTestTable"; - private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes(); - private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes(); - - // These are the row ids AND also the values we will put in the test table - private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"}; - - @BeforeClass - public static void activateHBaseCluster(){ - registerHBaseMiniClusterInClasspath(); - } - - @Before - public void createTestTable() throws IOException { - TableName tableName = TableName.valueOf(TEST_TABLE_NAME); - byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()}; - createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys); - HTable table = openTable(tableName); - - for (String rowId : ROW_IDS) { - byte[] rowIdBytes = rowId.getBytes(); - Put p = new Put(rowIdBytes); - // Use the rowId as the value to facilitate the testing better - p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); - table.put(p); - } - - table.close(); - } - - class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> { - @Override - protected Scan getScanner() { - return new Scan(); - } - - @Override - protected String getTableName() { - return TEST_TABLE_NAME; - } - - @Override - protected Tuple1<String> mapResultToTuple(Result r) { - return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); - } - } - - @Test - public void testTableInputFormat() { - ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); - environment.setParallelism(1); - - DataSet<String> resultDataSet = - environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() { - @Override - public String map(Tuple1<String> value) throws Exception { - return value.f0; - } - }); - - List<String> resultSet = new ArrayList<>(); - resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet)); - - try { - environment.execute("HBase InputFormat Test"); - } catch (Exception e) { - Assert.fail("HBase InputFormat test failed. " + e.getMessage()); - } - - for (String rowId : ROW_IDS) { - assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId)); - } - - assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size()); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java deleted file mode 100644 index 8579dee..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -public class HBaseFlinkTestConstants { - - public static final byte[] CF_SOME = "someCf".getBytes(); - public static final byte[] Q_SOME = "someQual".getBytes(); - public static final String TEST_TABLE_NAME = "test-table"; - public static final String TMP_DIR = "/tmp/test"; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java deleted file mode 100644 index dccf876..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -import org.apache.flink.addons.hbase.TableInputFormat; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Simple stub for HBase DataSet read - * - * To run the test first create the test table with hbase shell. - * - * Use the following commands: - * <ul> - * <li>create 'test-table', 'someCf'</li> - * <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li> - * <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li> - * </ul> - * - * The test should return just the first entry. - * - */ -public class HBaseReadExample { - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - @SuppressWarnings("serial") - DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() { - - @Override - public String getTableName() { - return HBaseFlinkTestConstants.TEST_TABLE_NAME; - } - - @Override - protected Scan getScanner() { - Scan scan = new Scan(); - scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME); - return scan; - } - - private Tuple2<String, String> reuse = new Tuple2<String, String>(); - - @Override - protected Tuple2<String, String> mapResultToTuple(Result r) { - String key = Bytes.toString(r.getRow()); - String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME)); - reuse.setField(key, 0); - reuse.setField(val, 1); - return reuse; - } - }) - .filter(new FilterFunction<Tuple2<String,String>>() { - - @Override - public boolean filter(Tuple2<String, String> t) throws Exception { - String val = t.getField(1); - if(val.startsWith("someStr")) - return true; - return false; - } - }); - - hbaseDs.print(); - - // kick off execution. - env.execute(); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java deleted file mode 100644 index 483bdff..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; - -/** - * Simple stub for HBase DataSet write - * - * To run the test first create the test table with hbase shell. - * - * Use the following commands: - * <ul> - * <li>create 'test-table', 'someCf'</li> - * </ul> - * - */ -@SuppressWarnings("serial") -public class HBaseWriteExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataSet<String> text = getTextDataSet(env); - - DataSet<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); - - // emit result - Job job = Job.getInstance(); - job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); - // TODO is "mapred.output.dir" really useful? - job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR); - counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() { - private transient Tuple2<Text, Mutation> reuse; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - reuse = new Tuple2<Text, Mutation>(); - } - - @Override - public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { - reuse.f0 = new Text(t.f0); - Put put = new Put(t.f0.getBytes()); - put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1)); - reuse.f1 = put; - return reuse; - } - }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); - - // execute program - env.execute("WordCount (HBase sink) Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Implements the string tokenizer that splits sentences into words as a user-defined - * FlatMapFunction. The function takes a line (String) and splits it into - * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). - */ - public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { - - @Override - public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { - // normalize and split the line - String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2<String, Integer>(token, 1)); - } - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - private static boolean fileOutput = false; - private static String textPath; - private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - // parse input arguments - fileOutput = true; - if(args.length == 2) { - textPath = args[0]; - outputTableName = args[1]; - } else { - System.err.println("Usage: HBaseWriteExample <text path> <output table>"); - return false; - } - } else { - System.out.println("Executing HBaseWriteExample example with built-in default data."); - System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" Usage: HBaseWriteExample <text path> <output table>"); - } - return true; - } - - private static DataSet<String> getTextDataSet(ExecutionEnvironment env) { - if(fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } else { - // get default test text data - return getDefaultTextLineDataSet(env); - } - } - private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) { - return env.fromElements(WORDS); - } - private static final String[] WORDS = new String[] { - "To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,", - "And by opposing end them?--To die,--to sleep,--", - "No more; and by a sleep to say we end", - "The heartache, and the thousand natural shocks", - "That flesh is heir to,--'tis a consummation", - "Devoutly to be wish'd. To die,--to sleep;--", - "To sleep! perchance to dream:--ay, there's the rub;", - "For in that sleep of death what dreams may come,", - "When we have shuffled off this mortal coil,", - "Must give us pause: there's the respect", - "That makes calamity of so long life;", - "For who would bear the whips and scorns of time,", - "The oppressor's wrong, the proud man's contumely,", - "The pangs of despis'd love, the law's delay,", - "The insolence of office, and the spurns", - "That patient merit of the unworthy takes,", - "When he himself might his quietus make", - "With a bare bodkin? who would these fardels bear,", - "To grunt and sweat under a weary life,", - "But that the dread of something after death,--", - "The undiscover'd country, from whose bourn", - "No traveller returns,--puzzles the will,", - "And makes us rather bear those ills we have", - "Than fly to others that we know not of?", - "Thus conscience does make cowards of us all;", - "And thus the native hue of resolution", - "Is sicklied o'er with the pale cast of thought;", - "And enterprises of great pith and moment,", - "With this regard, their currents turn awry,", - "And lose the name of action.--Soft you now!", - "The fair Ophelia!--Nymph, in thy orisons", - "Be all my sins remember'd." - }; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java deleted file mode 100644 index 05398db..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -import java.io.IOException; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * - * This is an example how to write streams into HBase. In this example the - * stream will be written into a local Hbase but it is possible to adapt this - * example for an HBase running in a cloud. You need a running local HBase with a - * table "flinkExample" and a column "entry". If your HBase configuration does - * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this - * hbase-site.xml to execute the example properly. - * - */ -public class HBaseWriteStreamExample { - - public static void main(String[] args) throws Exception { - final StreamExecutionEnvironment env = StreamExecutionEnvironment - .getExecutionEnvironment(); - - // data stream with random numbers - DataStream<String> dataStream = env.addSource(new SourceFunction<String>() { - private static final long serialVersionUID = 1L; - - private volatile boolean isRunning = true; - - @Override - public void run(SourceContext<String> out) throws Exception { - while (isRunning) { - out.collect(String.valueOf(Math.floor(Math.random() * 100))); - } - - } - - @Override - public void cancel() { - isRunning = false; - } - }); - dataStream.writeUsingOutputFormat(new HBaseOutputFormat()); - - env.execute(); - } - - /** - * - * This class implements an OutputFormat for HBase - * - */ - private static class HBaseOutputFormat implements OutputFormat<String> { - - private org.apache.hadoop.conf.Configuration conf = null; - private HTable table = null; - private String taskNumber = null; - private int rowNumber = 0; - - private static final long serialVersionUID = 1L; - - @Override - public void configure(Configuration parameters) { - conf = HBaseConfiguration.create(); - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - table = new HTable(conf, "flinkExample"); - this.taskNumber = String.valueOf(taskNumber); - } - - @Override - public void writeRecord(String record) throws IOException { - Put put = new Put(Bytes.toBytes(taskNumber + rowNumber)); - put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"), - Bytes.toBytes(rowNumber)); - rowNumber++; - table.put(put); - } - - @Override - public void close() throws IOException { - table.flushCommits(); - table.close(); - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties deleted file mode 100644 index 804ff45..0000000 --- a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -log4j.rootLogger=DEBUG, stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.threshold=INFO -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml deleted file mode 100644 index 6889e5a..0000000 --- a/flink-batch-connectors/flink-hcatalog/pom.xml +++ /dev/null @@ -1,182 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-batch-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-hcatalog</artifactId> - <name>flink-hcatalog</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-hadoop-compatibility_2.10</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hcatalog-core</artifactId> - <version>0.12.0</version> - <exclusions> - <exclusion> - <groupId>org.json</groupId> - <artifactId>json</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <scope>provided</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Scala Code Style, most of the configuration done via plugin management --> - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <configuration> - <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> - </configuration> - </plugin> - - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java deleted file mode 100644 index 859b706..0000000 --- a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java +++ /dev/null @@ -1,410 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hcatalog; - -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; -import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.DefaultHCatRecord; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * A InputFormat to read from HCatalog tables. - * The InputFormat supports projection (selection and order of fields) and partition filters. - * - * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple. - * - * Note: Flink tuples might only support a limited number of fields (depending on the API). - * - * @param <T> - */ -public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryable<T> { - - private static final long serialVersionUID = 1L; - - private Configuration configuration; - - private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat; - private RecordReader<WritableComparable, HCatRecord> recordReader; - private boolean fetched = false; - private boolean hasNext; - - protected String[] fieldNames = new String[0]; - protected HCatSchema outputSchema; - - private TypeInformation<T> resultType; - - public HCatInputFormatBase() { } - - /** - * Creates a HCatInputFormat for the given database and table. - * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. - * The return type of the InputFormat can be changed to Flink-native tuples by calling - * {@link HCatInputFormatBase#asFlinkTuples()}. - * - * @param database The name of the database to read from. - * @param table The name of the table to read. - * @throws java.io.IOException - */ - public HCatInputFormatBase(String database, String table) throws IOException { - this(database, table, new Configuration()); - } - - /** - * Creates a HCatInputFormat for the given database, table, and - * {@link org.apache.hadoop.conf.Configuration}. - * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. - * The return type of the InputFormat can be changed to Flink-native tuples by calling - * {@link HCatInputFormatBase#asFlinkTuples()}. - * - * @param database The name of the database to read from. - * @param table The name of the table to read. - * @param config The Configuration for the InputFormat. - * @throws java.io.IOException - */ - public HCatInputFormatBase(String database, String table, Configuration config) throws IOException { - super(); - this.configuration = config; - HadoopUtils.mergeHadoopConf(this.configuration); - - this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table); - this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration); - - // configure output schema of HCatFormat - configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); - // set type information - this.resultType = new WritableTypeInfo(DefaultHCatRecord.class); - } - - /** - * Specifies the fields which are returned by the InputFormat and their order. - * - * @param fields The fields and their order which are returned by the InputFormat. - * @return This InputFormat with specified return fields. - * @throws java.io.IOException - */ - public HCatInputFormatBase<T> getFields(String... fields) throws IOException { - - // build output schema - ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length); - for(String field : fields) { - fieldSchemas.add(this.outputSchema.get(field)); - } - this.outputSchema = new HCatSchema(fieldSchemas); - - // update output schema configuration - configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); - - return this; - } - - /** - * Specifies a SQL-like filter condition on the table's partition columns. - * Filter conditions on non-partition columns are invalid. - * A partition filter can significantly reduce the amount of data to be read. - * - * @param filter A SQL-like filter condition on the table's partition columns. - * @return This InputFormat with specified partition filter. - * @throws java.io.IOException - */ - public HCatInputFormatBase<T> withFilter(String filter) throws IOException { - - // set filter - this.hCatInputFormat.setFilter(filter); - - return this; - } - - /** - * Specifies that the InputFormat returns Flink tuples instead of - * {@link org.apache.hive.hcatalog.data.HCatRecord}. - * - * Note: Flink tuples might only support a limited number of fields (depending on the API). - * - * @return This InputFormat. - * @throws org.apache.hive.hcatalog.common.HCatException - */ - public HCatInputFormatBase<T> asFlinkTuples() throws HCatException { - - // build type information - int numFields = outputSchema.getFields().size(); - if(numFields > this.getMaxFlinkTupleSize()) { - throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+ - " fields can be returned as Flink tuples."); - } - - TypeInformation[] fieldTypes = new TypeInformation[numFields]; - fieldNames = new String[numFields]; - for (String fieldName : outputSchema.getFieldNames()) { - HCatFieldSchema field = outputSchema.get(fieldName); - - int fieldPos = outputSchema.getPosition(fieldName); - TypeInformation fieldType = getFieldType(field); - - fieldTypes[fieldPos] = fieldType; - fieldNames[fieldPos] = fieldName; - - } - this.resultType = new TupleTypeInfo(fieldTypes); - - return this; - } - - protected abstract int getMaxFlinkTupleSize(); - - private TypeInformation getFieldType(HCatFieldSchema fieldSchema) { - - switch(fieldSchema.getType()) { - case INT: - return BasicTypeInfo.INT_TYPE_INFO; - case TINYINT: - return BasicTypeInfo.BYTE_TYPE_INFO; - case SMALLINT: - return BasicTypeInfo.SHORT_TYPE_INFO; - case BIGINT: - return BasicTypeInfo.LONG_TYPE_INFO; - case BOOLEAN: - return BasicTypeInfo.BOOLEAN_TYPE_INFO; - case FLOAT: - return BasicTypeInfo.FLOAT_TYPE_INFO; - case DOUBLE: - return BasicTypeInfo.DOUBLE_TYPE_INFO; - case STRING: - return BasicTypeInfo.STRING_TYPE_INFO; - case BINARY: - return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; - case ARRAY: - return new GenericTypeInfo(List.class); - case MAP: - return new GenericTypeInfo(Map.class); - case STRUCT: - return new GenericTypeInfo(List.class); - default: - throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered."); - } - } - - /** - * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat. - * - * @return The Configuration of the HCatInputFormat. - */ - public Configuration getConfiguration() { - return this.configuration; - } - - /** - * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord} - * returned by this InputFormat. - * - * @return The HCatSchema of the HCatRecords returned by this InputFormat. - */ - public HCatSchema getOutputSchema() { - return this.outputSchema; - } - - // -------------------------------------------------------------------------------------------- - // InputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(org.apache.flink.configuration.Configuration parameters) { - // nothing to do - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - // no statistics provided at the moment - return null; - } - - @Override - public HadoopInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); - - JobContext jobContext = null; - try { - jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); - } catch (Exception e) { - throw new RuntimeException(e); - } - - List<InputSplit> splits; - try { - splits = this.hCatInputFormat.getSplits(jobContext); - } catch (InterruptedException e) { - throw new IOException("Could not get Splits.", e); - } - HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; - - for(int i = 0; i < hadoopInputSplits.length; i++){ - hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); - } - return hadoopInputSplits; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new LocatableInputSplitAssigner(inputSplits); - } - - @Override - public void open(HadoopInputSplit split) throws IOException { - TaskAttemptContext context = null; - try { - context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); - } catch(Exception e) { - throw new RuntimeException(e); - } - - try { - this.recordReader = this.hCatInputFormat - .createRecordReader(split.getHadoopInputSplit(), context); - this.recordReader.initialize(split.getHadoopInputSplit(), context); - } catch (InterruptedException e) { - throw new IOException("Could not create RecordReader.", e); - } finally { - this.fetched = false; - } - } - - @Override - public boolean reachedEnd() throws IOException { - if(!this.fetched) { - fetchNext(); - } - return !this.hasNext; - } - - private void fetchNext() throws IOException { - try { - this.hasNext = this.recordReader.nextKeyValue(); - } catch (InterruptedException e) { - throw new IOException("Could not fetch next KeyValue pair.", e); - } finally { - this.fetched = true; - } - } - - @Override - public T nextRecord(T record) throws IOException { - if(!this.fetched) { - // first record - fetchNext(); - } - if(!this.hasNext) { - return null; - } - try { - - // get next HCatRecord - HCatRecord v = this.recordReader.getCurrentValue(); - this.fetched = false; - - if(this.fieldNames.length > 0) { - // return as Flink tuple - return this.buildFlinkTuple(record, v); - - } else { - // return as HCatRecord - return (T)v; - } - - } catch (InterruptedException e) { - throw new IOException("Could not get next record.", e); - } - } - - protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException; - - @Override - public void close() throws IOException { - this.recordReader.close(); - } - - // -------------------------------------------------------------------------------------------- - // Custom de/serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeInt(this.fieldNames.length); - for(String fieldName : this.fieldNames) { - out.writeUTF(fieldName); - } - this.configuration.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - this.fieldNames = new String[in.readInt()]; - for(int i=0; i<this.fieldNames.length; i++) { - this.fieldNames[i] = in.readUTF(); - } - - Configuration configuration = new Configuration(); - configuration.readFields(in); - - if(this.configuration == null) { - this.configuration = configuration; - } - - this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat(); - this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema")); - } - - // -------------------------------------------------------------------------------------------- - // Result type business - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<T> getProducedType() { - return this.resultType; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java deleted file mode 100644 index 46f3cd5..0000000 --- a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hcatalog.java; - - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.hcatalog.HCatInputFormatBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.data.HCatRecord; - -/** - * A InputFormat to read from HCatalog tables. - * The InputFormat supports projection (selection and order of fields) and partition filters. - * - * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. - * Flink tuples support only up to 25 fields. - * - * @param <T> - */ -public class HCatInputFormat<T> extends HCatInputFormatBase<T> { - private static final long serialVersionUID = 1L; - - public HCatInputFormat() {} - - public HCatInputFormat(String database, String table) throws Exception { - super(database, table); - } - - public HCatInputFormat(String database, String table, Configuration config) throws Exception { - super(database, table, config); - } - - - @Override - protected int getMaxFlinkTupleSize() { - return 25; - } - - @Override - protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException { - - Tuple tuple = (Tuple)t; - - // Extract all fields from HCatRecord - for(int i=0; i < this.fieldNames.length; i++) { - - // get field value - Object o = record.get(this.fieldNames[i], this.outputSchema); - - // Set field value in Flink tuple. - // Partition columns are returned as String and - // need to be converted to original type. - switch(this.outputSchema.get(i).getType()) { - case INT: - if(o instanceof String) { - tuple.setField(Integer.parseInt((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case TINYINT: - if(o instanceof String) { - tuple.setField(Byte.parseByte((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case SMALLINT: - if(o instanceof String) { - tuple.setField(Short.parseShort((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case BIGINT: - if(o instanceof String) { - tuple.setField(Long.parseLong((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case BOOLEAN: - if(o instanceof String) { - tuple.setField(Boolean.parseBoolean((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case FLOAT: - if(o instanceof String) { - tuple.setField(Float.parseFloat((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case DOUBLE: - if(o instanceof String) { - tuple.setField(Double.parseDouble((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case STRING: - tuple.setField(o, i); - break; - case BINARY: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type BINARY."); - } else { - tuple.setField(o, i); - } - break; - case ARRAY: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type ARRAY."); - } else { - tuple.setField(o, i); - } - break; - case MAP: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type MAP."); - } else { - tuple.setField(o, i); - } - break; - case STRUCT: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type STRUCT."); - } else { - tuple.setField(o, i); - } - break; - default: - throw new RuntimeException("Invalid Type"); - } - } - - return (T)tuple; - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala deleted file mode 100644 index 0299ee1..0000000 --- a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hcatalog.scala - -import org.apache.flink.configuration -import org.apache.flink.hcatalog.HCatInputFormatBase -import org.apache.hadoop.conf.Configuration -import org.apache.hive.hcatalog.data.HCatRecord -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema - -/** - * A InputFormat to read from HCatalog tables. - * The InputFormat supports projection (selection and order of fields) and partition filters. - * - * Data can be returned as [[HCatRecord]] or Scala tuples. - * Scala tuples support only up to 22 fields. - * - */ -class HCatInputFormat[T]( - database: String, - table: String, - config: Configuration - ) extends HCatInputFormatBase[T](database, table, config) { - - def this(database: String, table: String) { - this(database, table, new Configuration) - } - - var vals: Array[Any] = Array[Any]() - - override def configure(parameters: configuration.Configuration): Unit = { - super.configure(parameters) - vals = new Array[Any](fieldNames.length) - } - - override protected def getMaxFlinkTupleSize: Int = 22 - - override protected def buildFlinkTuple(t: T, record: HCatRecord): T = { - - // Extract all fields from HCatRecord - var i: Int = 0 - while (i < this.fieldNames.length) { - - val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema) - - // partition columns are returned as String - // Check and convert to actual type. - this.outputSchema.get(i).getType match { - case HCatFieldSchema.Type.INT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toInt - } - else { - vals(i) = o.asInstanceOf[Int] - } - case HCatFieldSchema.Type.TINYINT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toInt.toByte - } - else { - vals(i) = o.asInstanceOf[Byte] - } - case HCatFieldSchema.Type.SMALLINT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toInt.toShort - } - else { - vals(i) = o.asInstanceOf[Short] - } - case HCatFieldSchema.Type.BIGINT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toLong - } - else { - vals(i) = o.asInstanceOf[Long] - } - case HCatFieldSchema.Type.BOOLEAN => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toBoolean - } - else { - vals(i) = o.asInstanceOf[Boolean] - } - case HCatFieldSchema.Type.FLOAT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toFloat - } - else { - vals(i) = o.asInstanceOf[Float] - } - case HCatFieldSchema.Type.DOUBLE => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toDouble - } - else { - vals(i) = o.asInstanceOf[Double] - } - case HCatFieldSchema.Type.STRING => - vals(i) = o - case HCatFieldSchema.Type.BINARY => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type BINARY.") - } - else { - vals(i) = o.asInstanceOf[Array[Byte]] - } - case HCatFieldSchema.Type.ARRAY => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type ARRAY.") - } - else { - vals(i) = o.asInstanceOf[List[Object]] - } - case HCatFieldSchema.Type.MAP => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type MAP.") - } - else { - vals(i) = o.asInstanceOf[Map[Object, Object]] - } - case HCatFieldSchema.Type.STRUCT => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type STRUCT.") - } - else { - vals(i) = o.asInstanceOf[List[Object]] - } - case _ => - throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType + - " encountered.") - } - - i += 1 - } - createScalaTuple(vals) - } - - private def createScalaTuple(vals: Array[Any]): T = { - - this.fieldNames.length match { - case 1 => - new Tuple1(vals(0)).asInstanceOf[T] - case 2 => - new Tuple2(vals(0), vals(1)).asInstanceOf[T] - case 3 => - new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T] - case 4 => - new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T] - case 5 => - new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T] - case 6 => - new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T] - case 7 => - new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T] - case 8 => - new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7)) - .asInstanceOf[T] - case 9 => - new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8)).asInstanceOf[T] - case 10 => - new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9)).asInstanceOf[T] - case 11 => - new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10)).asInstanceOf[T] - case 12 => - new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T] - case 13 => - new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T] - case 14 => - new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T] - case 15 => - new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T] - case 16 => - new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15)) - .asInstanceOf[T] - case 17 => - new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16)).asInstanceOf[T] - case 18 => - new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17)).asInstanceOf[T] - case 19 => - new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18)).asInstanceOf[T] - case 20 => - new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T] - case 21 => - new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T] - case 22 => - new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T] - case _ => - throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.") - - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/pom.xml b/flink-batch-connectors/flink-jdbc/pom.xml deleted file mode 100644 index 40779ba..0000000 --- a/flink-batch-connectors/flink-jdbc/pom.xml +++ /dev/null @@ -1,66 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-batch-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-jdbc</artifactId> - <name>flink-jdbc</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>10.10.1.1</version> - <scope>test</scope> - </dependency> - </dependencies> -</project>
