http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java new file mode 100644 index 0000000..3dddd88 --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TableInputFormatITCase extends HBaseTestingClusterAutostarter { + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable"; + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes(); + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes(); + + // These are the row ids AND also the values we will put in the test table + private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"}; + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Before + public void createTestTable() throws IOException { + TableName tableName = TableName.valueOf(TEST_TABLE_NAME); + byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()}; + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys); + HTable table = openTable(tableName); + + for (String rowId : ROW_IDS) { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + } + + table.close(); + } + + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> { + @Override + protected Scan getScanner() { + return new Scan(); + } + + @Override + protected String getTableName() { + return TEST_TABLE_NAME; + } + + @Override + protected Tuple1<String> mapResultToTuple(Result r) { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + } + } + + @Test + public void testTableInputFormat() { + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + environment.setParallelism(1); + + DataSet<String> resultDataSet = + environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() { + @Override + public String map(Tuple1<String> value) throws Exception { + return value.f0; + } + }); + + List<String> resultSet = new ArrayList<>(); + resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet)); + + try { + environment.execute("HBase InputFormat Test"); + } catch (Exception e) { + Assert.fail("HBase InputFormat test failed. " + e.getMessage()); + } + + for (String rowId : ROW_IDS) { + assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId)); + } + + assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size()); + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java new file mode 100644 index 0000000..8579dee --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +public class HBaseFlinkTestConstants { + + public static final byte[] CF_SOME = "someCf".getBytes(); + public static final byte[] Q_SOME = "someQual".getBytes(); + public static final String TEST_TABLE_NAME = "test-table"; + public static final String TMP_DIR = "/tmp/test"; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java new file mode 100644 index 0000000..dccf876 --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.TableInputFormat; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Simple stub for HBase DataSet read + * + * To run the test first create the test table with hbase shell. + * + * Use the following commands: + * <ul> + * <li>create 'test-table', 'someCf'</li> + * <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li> + * <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li> + * </ul> + * + * The test should return just the first entry. + * + */ +public class HBaseReadExample { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + @SuppressWarnings("serial") + DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() { + + @Override + public String getTableName() { + return HBaseFlinkTestConstants.TEST_TABLE_NAME; + } + + @Override + protected Scan getScanner() { + Scan scan = new Scan(); + scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME); + return scan; + } + + private Tuple2<String, String> reuse = new Tuple2<String, String>(); + + @Override + protected Tuple2<String, String> mapResultToTuple(Result r) { + String key = Bytes.toString(r.getRow()); + String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME)); + reuse.setField(key, 0); + reuse.setField(val, 1); + return reuse; + } + }) + .filter(new FilterFunction<Tuple2<String,String>>() { + + @Override + public boolean filter(Tuple2<String, String> t) throws Exception { + String val = t.getField(1); + if(val.startsWith("someStr")) + return true; + return false; + } + }); + + hbaseDs.print(); + + // kick off execution. + env.execute(); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java new file mode 100644 index 0000000..483bdff --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +/** + * Simple stub for HBase DataSet write + * + * To run the test first create the test table with hbase shell. + * + * Use the following commands: + * <ul> + * <li>create 'test-table', 'someCf'</li> + * </ul> + * + */ +@SuppressWarnings("serial") +public class HBaseWriteExample { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<String> text = getTextDataSet(env); + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result + Job job = Job.getInstance(); + job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); + // TODO is "mapred.output.dir" really useful? + job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR); + counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() { + private transient Tuple2<Text, Mutation> reuse; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + reuse = new Tuple2<Text, Mutation>(); + } + + @Override + public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { + reuse.f0 = new Text(t.f0); + Put put = new Put(t.f0.getBytes()); + put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1)); + reuse.f1 = put; + return reuse; + } + }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); + + // execute program + env.execute("WordCount (HBase sink) Example"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a user-defined + * FlatMapFunction. The function takes a line (String) and splits it into + * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). + */ + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + private static boolean fileOutput = false; + private static String textPath; + private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + // parse input arguments + fileOutput = true; + if(args.length == 2) { + textPath = args[0]; + outputTableName = args[1]; + } else { + System.err.println("Usage: HBaseWriteExample <text path> <output table>"); + return false; + } + } else { + System.out.println("Executing HBaseWriteExample example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: HBaseWriteExample <text path> <output table>"); + } + return true; + } + + private static DataSet<String> getTextDataSet(ExecutionEnvironment env) { + if(fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } else { + // get default test text data + return getDefaultTextLineDataSet(env); + } + } + private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) { + return env.fromElements(WORDS); + } + private static final String[] WORDS = new String[] { + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,", + "And by opposing end them?--To die,--to sleep,--", + "No more; and by a sleep to say we end", + "The heartache, and the thousand natural shocks", + "That flesh is heir to,--'tis a consummation", + "Devoutly to be wish'd. To die,--to sleep;--", + "To sleep! perchance to dream:--ay, there's the rub;", + "For in that sleep of death what dreams may come,", + "When we have shuffled off this mortal coil,", + "Must give us pause: there's the respect", + "That makes calamity of so long life;", + "For who would bear the whips and scorns of time,", + "The oppressor's wrong, the proud man's contumely,", + "The pangs of despis'd love, the law's delay,", + "The insolence of office, and the spurns", + "That patient merit of the unworthy takes,", + "When he himself might his quietus make", + "With a bare bodkin? who would these fardels bear,", + "To grunt and sweat under a weary life,", + "But that the dread of something after death,--", + "The undiscover'd country, from whose bourn", + "No traveller returns,--puzzles the will,", + "And makes us rather bear those ills we have", + "Than fly to others that we know not of?", + "Thus conscience does make cowards of us all;", + "And thus the native hue of resolution", + "Is sicklied o'er with the pale cast of thought;", + "And enterprises of great pith and moment,", + "With this regard, their currents turn awry,", + "And lose the name of action.--Soft you now!", + "The fair Ophelia!--Nymph, in thy orisons", + "Be all my sins remember'd." + }; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java new file mode 100644 index 0000000..05398db --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import java.io.IOException; + +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * + * This is an example how to write streams into HBase. In this example the + * stream will be written into a local Hbase but it is possible to adapt this + * example for an HBase running in a cloud. You need a running local HBase with a + * table "flinkExample" and a column "entry". If your HBase configuration does + * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this + * hbase-site.xml to execute the example properly. + * + */ +public class HBaseWriteStreamExample { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(); + + // data stream with random numbers + DataStream<String> dataStream = env.addSource(new SourceFunction<String>() { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + @Override + public void run(SourceContext<String> out) throws Exception { + while (isRunning) { + out.collect(String.valueOf(Math.floor(Math.random() * 100))); + } + + } + + @Override + public void cancel() { + isRunning = false; + } + }); + dataStream.writeUsingOutputFormat(new HBaseOutputFormat()); + + env.execute(); + } + + /** + * + * This class implements an OutputFormat for HBase + * + */ + private static class HBaseOutputFormat implements OutputFormat<String> { + + private org.apache.hadoop.conf.Configuration conf = null; + private HTable table = null; + private String taskNumber = null; + private int rowNumber = 0; + + private static final long serialVersionUID = 1L; + + @Override + public void configure(Configuration parameters) { + conf = HBaseConfiguration.create(); + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + table = new HTable(conf, "flinkExample"); + this.taskNumber = String.valueOf(taskNumber); + } + + @Override + public void writeRecord(String record) throws IOException { + Put put = new Put(Bytes.toBytes(taskNumber + rowNumber)); + put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"), + Bytes.toBytes(rowNumber)); + rowNumber++; + table.put(put); + } + + @Override + public void close() throws IOException { + table.flushCommits(); + table.close(); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..804ff45 --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger=DEBUG, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.threshold=INFO +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hcatalog/pom.xml b/flink-connectors/flink-hcatalog/pom.xml new file mode 100644 index 0000000..dde7996 --- /dev/null +++ b/flink-connectors/flink-hcatalog/pom.xml @@ -0,0 +1,182 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-hcatalog</artifactId> + <name>flink-hcatalog</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hcatalog-core</artifactId> + <version>0.12.0</version> + <exclusions> + <exclusion> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Scala Code Style, most of the configuration done via plugin management --> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <configuration> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + </configuration> + </plugin> + + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java new file mode 100644 index 0000000..859b706 --- /dev/null +++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hcatalog; + +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.WritableTypeInfo; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A InputFormat to read from HCatalog tables. + * The InputFormat supports projection (selection and order of fields) and partition filters. + * + * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple. + * + * Note: Flink tuples might only support a limited number of fields (depending on the API). + * + * @param <T> + */ +public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryable<T> { + + private static final long serialVersionUID = 1L; + + private Configuration configuration; + + private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat; + private RecordReader<WritableComparable, HCatRecord> recordReader; + private boolean fetched = false; + private boolean hasNext; + + protected String[] fieldNames = new String[0]; + protected HCatSchema outputSchema; + + private TypeInformation<T> resultType; + + public HCatInputFormatBase() { } + + /** + * Creates a HCatInputFormat for the given database and table. + * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. + * The return type of the InputFormat can be changed to Flink-native tuples by calling + * {@link HCatInputFormatBase#asFlinkTuples()}. + * + * @param database The name of the database to read from. + * @param table The name of the table to read. + * @throws java.io.IOException + */ + public HCatInputFormatBase(String database, String table) throws IOException { + this(database, table, new Configuration()); + } + + /** + * Creates a HCatInputFormat for the given database, table, and + * {@link org.apache.hadoop.conf.Configuration}. + * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. + * The return type of the InputFormat can be changed to Flink-native tuples by calling + * {@link HCatInputFormatBase#asFlinkTuples()}. + * + * @param database The name of the database to read from. + * @param table The name of the table to read. + * @param config The Configuration for the InputFormat. + * @throws java.io.IOException + */ + public HCatInputFormatBase(String database, String table, Configuration config) throws IOException { + super(); + this.configuration = config; + HadoopUtils.mergeHadoopConf(this.configuration); + + this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table); + this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration); + + // configure output schema of HCatFormat + configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); + // set type information + this.resultType = new WritableTypeInfo(DefaultHCatRecord.class); + } + + /** + * Specifies the fields which are returned by the InputFormat and their order. + * + * @param fields The fields and their order which are returned by the InputFormat. + * @return This InputFormat with specified return fields. + * @throws java.io.IOException + */ + public HCatInputFormatBase<T> getFields(String... fields) throws IOException { + + // build output schema + ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length); + for(String field : fields) { + fieldSchemas.add(this.outputSchema.get(field)); + } + this.outputSchema = new HCatSchema(fieldSchemas); + + // update output schema configuration + configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); + + return this; + } + + /** + * Specifies a SQL-like filter condition on the table's partition columns. + * Filter conditions on non-partition columns are invalid. + * A partition filter can significantly reduce the amount of data to be read. + * + * @param filter A SQL-like filter condition on the table's partition columns. + * @return This InputFormat with specified partition filter. + * @throws java.io.IOException + */ + public HCatInputFormatBase<T> withFilter(String filter) throws IOException { + + // set filter + this.hCatInputFormat.setFilter(filter); + + return this; + } + + /** + * Specifies that the InputFormat returns Flink tuples instead of + * {@link org.apache.hive.hcatalog.data.HCatRecord}. + * + * Note: Flink tuples might only support a limited number of fields (depending on the API). + * + * @return This InputFormat. + * @throws org.apache.hive.hcatalog.common.HCatException + */ + public HCatInputFormatBase<T> asFlinkTuples() throws HCatException { + + // build type information + int numFields = outputSchema.getFields().size(); + if(numFields > this.getMaxFlinkTupleSize()) { + throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+ + " fields can be returned as Flink tuples."); + } + + TypeInformation[] fieldTypes = new TypeInformation[numFields]; + fieldNames = new String[numFields]; + for (String fieldName : outputSchema.getFieldNames()) { + HCatFieldSchema field = outputSchema.get(fieldName); + + int fieldPos = outputSchema.getPosition(fieldName); + TypeInformation fieldType = getFieldType(field); + + fieldTypes[fieldPos] = fieldType; + fieldNames[fieldPos] = fieldName; + + } + this.resultType = new TupleTypeInfo(fieldTypes); + + return this; + } + + protected abstract int getMaxFlinkTupleSize(); + + private TypeInformation getFieldType(HCatFieldSchema fieldSchema) { + + switch(fieldSchema.getType()) { + case INT: + return BasicTypeInfo.INT_TYPE_INFO; + case TINYINT: + return BasicTypeInfo.BYTE_TYPE_INFO; + case SMALLINT: + return BasicTypeInfo.SHORT_TYPE_INFO; + case BIGINT: + return BasicTypeInfo.LONG_TYPE_INFO; + case BOOLEAN: + return BasicTypeInfo.BOOLEAN_TYPE_INFO; + case FLOAT: + return BasicTypeInfo.FLOAT_TYPE_INFO; + case DOUBLE: + return BasicTypeInfo.DOUBLE_TYPE_INFO; + case STRING: + return BasicTypeInfo.STRING_TYPE_INFO; + case BINARY: + return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; + case ARRAY: + return new GenericTypeInfo(List.class); + case MAP: + return new GenericTypeInfo(Map.class); + case STRUCT: + return new GenericTypeInfo(List.class); + default: + throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered."); + } + } + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat. + * + * @return The Configuration of the HCatInputFormat. + */ + public Configuration getConfiguration() { + return this.configuration; + } + + /** + * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord} + * returned by this InputFormat. + * + * @return The HCatSchema of the HCatRecords returned by this InputFormat. + */ + public HCatSchema getOutputSchema() { + return this.outputSchema; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(org.apache.flink.configuration.Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // no statistics provided at the moment + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + List<InputSplit> splits; + try { + splits = this.hCatInputFormat.getSplits(jobContext); + } catch (InterruptedException e) { + throw new IOException("Could not get Splits.", e); + } + HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; + + for(int i = 0; i < hadoopInputSplits.length; i++){ + hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); + } + return hadoopInputSplits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + TaskAttemptContext context = null; + try { + context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); + } catch(Exception e) { + throw new RuntimeException(e); + } + + try { + this.recordReader = this.hCatInputFormat + .createRecordReader(split.getHadoopInputSplit(), context); + this.recordReader.initialize(split.getHadoopInputSplit(), context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordReader.", e); + } finally { + this.fetched = false; + } + } + + @Override + public boolean reachedEnd() throws IOException { + if(!this.fetched) { + fetchNext(); + } + return !this.hasNext; + } + + private void fetchNext() throws IOException { + try { + this.hasNext = this.recordReader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException("Could not fetch next KeyValue pair.", e); + } finally { + this.fetched = true; + } + } + + @Override + public T nextRecord(T record) throws IOException { + if(!this.fetched) { + // first record + fetchNext(); + } + if(!this.hasNext) { + return null; + } + try { + + // get next HCatRecord + HCatRecord v = this.recordReader.getCurrentValue(); + this.fetched = false; + + if(this.fieldNames.length > 0) { + // return as Flink tuple + return this.buildFlinkTuple(record, v); + + } else { + // return as HCatRecord + return (T)v; + } + + } catch (InterruptedException e) { + throw new IOException("Could not get next record.", e); + } + } + + protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException; + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Custom de/serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeInt(this.fieldNames.length); + for(String fieldName : this.fieldNames) { + out.writeUTF(fieldName); + } + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + this.fieldNames = new String[in.readInt()]; + for(int i=0; i<this.fieldNames.length; i++) { + this.fieldNames[i] = in.readUTF(); + } + + Configuration configuration = new Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat(); + this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema")); + } + + // -------------------------------------------------------------------------------------------- + // Result type business + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation<T> getProducedType() { + return this.resultType; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java new file mode 100644 index 0000000..46f3cd5 --- /dev/null +++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hcatalog.java; + + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.hcatalog.HCatInputFormatBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.HCatRecord; + +/** + * A InputFormat to read from HCatalog tables. + * The InputFormat supports projection (selection and order of fields) and partition filters. + * + * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. + * Flink tuples support only up to 25 fields. + * + * @param <T> + */ +public class HCatInputFormat<T> extends HCatInputFormatBase<T> { + private static final long serialVersionUID = 1L; + + public HCatInputFormat() {} + + public HCatInputFormat(String database, String table) throws Exception { + super(database, table); + } + + public HCatInputFormat(String database, String table, Configuration config) throws Exception { + super(database, table, config); + } + + + @Override + protected int getMaxFlinkTupleSize() { + return 25; + } + + @Override + protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException { + + Tuple tuple = (Tuple)t; + + // Extract all fields from HCatRecord + for(int i=0; i < this.fieldNames.length; i++) { + + // get field value + Object o = record.get(this.fieldNames[i], this.outputSchema); + + // Set field value in Flink tuple. + // Partition columns are returned as String and + // need to be converted to original type. + switch(this.outputSchema.get(i).getType()) { + case INT: + if(o instanceof String) { + tuple.setField(Integer.parseInt((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case TINYINT: + if(o instanceof String) { + tuple.setField(Byte.parseByte((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case SMALLINT: + if(o instanceof String) { + tuple.setField(Short.parseShort((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case BIGINT: + if(o instanceof String) { + tuple.setField(Long.parseLong((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case BOOLEAN: + if(o instanceof String) { + tuple.setField(Boolean.parseBoolean((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case FLOAT: + if(o instanceof String) { + tuple.setField(Float.parseFloat((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case DOUBLE: + if(o instanceof String) { + tuple.setField(Double.parseDouble((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case STRING: + tuple.setField(o, i); + break; + case BINARY: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type BINARY."); + } else { + tuple.setField(o, i); + } + break; + case ARRAY: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type ARRAY."); + } else { + tuple.setField(o, i); + } + break; + case MAP: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type MAP."); + } else { + tuple.setField(o, i); + } + break; + case STRUCT: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type STRUCT."); + } else { + tuple.setField(o, i); + } + break; + default: + throw new RuntimeException("Invalid Type"); + } + } + + return (T)tuple; + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala new file mode 100644 index 0000000..0299ee1 --- /dev/null +++ b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hcatalog.scala + +import org.apache.flink.configuration +import org.apache.flink.hcatalog.HCatInputFormatBase +import org.apache.hadoop.conf.Configuration +import org.apache.hive.hcatalog.data.HCatRecord +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema + +/** + * A InputFormat to read from HCatalog tables. + * The InputFormat supports projection (selection and order of fields) and partition filters. + * + * Data can be returned as [[HCatRecord]] or Scala tuples. + * Scala tuples support only up to 22 fields. + * + */ +class HCatInputFormat[T]( + database: String, + table: String, + config: Configuration + ) extends HCatInputFormatBase[T](database, table, config) { + + def this(database: String, table: String) { + this(database, table, new Configuration) + } + + var vals: Array[Any] = Array[Any]() + + override def configure(parameters: configuration.Configuration): Unit = { + super.configure(parameters) + vals = new Array[Any](fieldNames.length) + } + + override protected def getMaxFlinkTupleSize: Int = 22 + + override protected def buildFlinkTuple(t: T, record: HCatRecord): T = { + + // Extract all fields from HCatRecord + var i: Int = 0 + while (i < this.fieldNames.length) { + + val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema) + + // partition columns are returned as String + // Check and convert to actual type. + this.outputSchema.get(i).getType match { + case HCatFieldSchema.Type.INT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toInt + } + else { + vals(i) = o.asInstanceOf[Int] + } + case HCatFieldSchema.Type.TINYINT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toInt.toByte + } + else { + vals(i) = o.asInstanceOf[Byte] + } + case HCatFieldSchema.Type.SMALLINT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toInt.toShort + } + else { + vals(i) = o.asInstanceOf[Short] + } + case HCatFieldSchema.Type.BIGINT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toLong + } + else { + vals(i) = o.asInstanceOf[Long] + } + case HCatFieldSchema.Type.BOOLEAN => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toBoolean + } + else { + vals(i) = o.asInstanceOf[Boolean] + } + case HCatFieldSchema.Type.FLOAT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toFloat + } + else { + vals(i) = o.asInstanceOf[Float] + } + case HCatFieldSchema.Type.DOUBLE => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toDouble + } + else { + vals(i) = o.asInstanceOf[Double] + } + case HCatFieldSchema.Type.STRING => + vals(i) = o + case HCatFieldSchema.Type.BINARY => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type BINARY.") + } + else { + vals(i) = o.asInstanceOf[Array[Byte]] + } + case HCatFieldSchema.Type.ARRAY => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type ARRAY.") + } + else { + vals(i) = o.asInstanceOf[List[Object]] + } + case HCatFieldSchema.Type.MAP => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type MAP.") + } + else { + vals(i) = o.asInstanceOf[Map[Object, Object]] + } + case HCatFieldSchema.Type.STRUCT => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type STRUCT.") + } + else { + vals(i) = o.asInstanceOf[List[Object]] + } + case _ => + throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType + + " encountered.") + } + + i += 1 + } + createScalaTuple(vals) + } + + private def createScalaTuple(vals: Array[Any]): T = { + + this.fieldNames.length match { + case 1 => + new Tuple1(vals(0)).asInstanceOf[T] + case 2 => + new Tuple2(vals(0), vals(1)).asInstanceOf[T] + case 3 => + new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T] + case 4 => + new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T] + case 5 => + new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T] + case 6 => + new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T] + case 7 => + new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T] + case 8 => + new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7)) + .asInstanceOf[T] + case 9 => + new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8)).asInstanceOf[T] + case 10 => + new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9)).asInstanceOf[T] + case 11 => + new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10)).asInstanceOf[T] + case 12 => + new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T] + case 13 => + new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T] + case 14 => + new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T] + case 15 => + new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T] + case 16 => + new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15)) + .asInstanceOf[T] + case 17 => + new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16)).asInstanceOf[T] + case 18 => + new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17)).asInstanceOf[T] + case 19 => + new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18)).asInstanceOf[T] + case 20 => + new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T] + case 21 => + new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T] + case 22 => + new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T] + case _ => + throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.") + + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml new file mode 100644 index 0000000..be42648 --- /dev/null +++ b/flink-connectors/flink-jdbc/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-jdbc</artifactId> + <name>flink-jdbc</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.10.1.1</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java new file mode 100644 index 0000000..b4246f5 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * InputFormat to read data from a database and generate Rows. + * The InputFormat has to be configured using the supplied InputFormatBuilder. + * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br> + * + * <pre><code> + * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { + * BasicTypeInfo.INT_TYPE_INFO, + * BasicTypeInfo.STRING_TYPE_INFO, + * BasicTypeInfo.STRING_TYPE_INFO, + * BasicTypeInfo.DOUBLE_TYPE_INFO, + * BasicTypeInfo.INT_TYPE_INFO + * }; + * + * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); + * + * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + * .setDBUrl("jdbc:derby:memory:ebookshop") + * .setQuery("select * from books") + * .setRowTypeInfo(rowTypeInfo) + * .finish(); + * </code></pre> + * + * In order to query the JDBC source in parallel, you need to provide a + * parameterized query template (i.e. a valid {@link PreparedStatement}) and + * a {@link ParameterValuesProvider} which provides binding values for the + * query parameters. E.g.:</br> + * + * <pre><code> + * + * Serializable[][] queryParameters = new String[2][1]; + * queryParameters[0] = new String[]{"Kumar"}; + * queryParameters[1] = new String[]{"Tan Ah Teck"}; + * + * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + * .setDBUrl("jdbc:derby:memory:ebookshop") + * .setQuery("select * from books WHERE author = ?") + * .setRowTypeInfo(rowTypeInfo) + * .setParametersProvider(new GenericParameterValuesProvider(queryParameters)) + * .finish(); + * </code></pre> + * + * @see Row + * @see ParameterValuesProvider + * @see PreparedStatement + * @see DriverManager + */ +public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); + + private String username; + private String password; + private String drivername; + private String dbURL; + private String queryTemplate; + private int resultSetType; + private int resultSetConcurrency; + private RowTypeInfo rowTypeInfo; + + private transient Connection dbConn; + private transient PreparedStatement statement; + private transient ResultSet resultSet; + + private boolean hasNext; + private Object[][] parameterValues; + + public JDBCInputFormat() { + } + + @Override + public RowTypeInfo getProducedType() { + return rowTypeInfo; + } + + @Override + public void configure(Configuration parameters) { + //do nothing here + } + + @Override + public void openInputFormat() { + //called once per inputFormat (on open) + try { + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); + } catch (SQLException se) { + throw new IllegalArgumentException("open() failed." + se.getMessage(), se); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); + } + } + + @Override + public void closeInputFormat() { + //called once per inputFormat (on close) + try { + if(statement != null) { + statement.close(); + } + } catch (SQLException se) { + LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage()); + } finally { + statement = null; + } + + try { + if(dbConn != null) { + dbConn.close(); + } + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } finally { + dbConn = null; + } + + parameterValues = null; + } + + /** + * Connects to the source database and executes the query in a <b>parallel + * fashion</b> if + * this {@link InputFormat} is built using a parameterized query (i.e. using + * a {@link PreparedStatement}) + * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel + * fashion</b> otherwise. + * + * @param inputSplit which is ignored if this InputFormat is executed as a + * non-parallel source, + * a "hook" to the query parameters otherwise (using its + * <i>splitNumber</i>) + * @throws IOException if there's an error during the execution of the query + */ + @Override + public void open(InputSplit inputSplit) throws IOException { + try { + if (inputSplit != null && parameterValues != null) { + for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { + Object param = parameterValues[inputSplit.getSplitNumber()][i]; + if (param instanceof String) { + statement.setString(i + 1, (String) param); + } else if (param instanceof Long) { + statement.setLong(i + 1, (Long) param); + } else if (param instanceof Integer) { + statement.setInt(i + 1, (Integer) param); + } else if (param instanceof Double) { + statement.setDouble(i + 1, (Double) param); + } else if (param instanceof Boolean) { + statement.setBoolean(i + 1, (Boolean) param); + } else if (param instanceof Float) { + statement.setFloat(i + 1, (Float) param); + } else if (param instanceof BigDecimal) { + statement.setBigDecimal(i + 1, (BigDecimal) param); + } else if (param instanceof Byte) { + statement.setByte(i + 1, (Byte) param); + } else if (param instanceof Short) { + statement.setShort(i + 1, (Short) param); + } else if (param instanceof Date) { + statement.setDate(i + 1, (Date) param); + } else if (param instanceof Time) { + statement.setTime(i + 1, (Time) param); + } else if (param instanceof Timestamp) { + statement.setTimestamp(i + 1, (Timestamp) param); + } else if (param instanceof Array) { + statement.setArray(i + 1, (Array) param); + } else { + //extends with other types if needed + throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." ); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); + } + } + resultSet = statement.executeQuery(); + hasNext = resultSet.next(); + } catch (SQLException se) { + throw new IllegalArgumentException("open() failed." + se.getMessage(), se); + } + } + + /** + * Closes all resources used. + * + * @throws IOException Indicates that a resource could not be closed. + */ + @Override + public void close() throws IOException { + if(resultSet == null) { + return; + } + try { + resultSet.close(); + } catch (SQLException se) { + LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage()); + } + } + + /** + * Checks whether all data has been read. + * + * @return boolean value indication whether all data has been read. + * @throws IOException + */ + @Override + public boolean reachedEnd() throws IOException { + return !hasNext; + } + + /** + * Stores the next resultSet row in a tuple + * + * @param row row to be reused. + * @return row containing next {@link Row} + * @throws java.io.IOException + */ + @Override + public Row nextRecord(Row row) throws IOException { + try { + if (!hasNext) { + return null; + } + for (int pos = 0; pos < row.productArity(); pos++) { + row.setField(pos, resultSet.getObject(pos + 1)); + } + //update hasNext after we've read the record + hasNext = resultSet.next(); + return row; + } catch (SQLException se) { + throw new IOException("Couldn't read data - " + se.getMessage(), se); + } catch (NullPointerException npe) { + throw new IOException("Couldn't access resultSet", npe); + } + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + if (parameterValues == null) { + return new GenericInputSplit[]{new GenericInputSplit(0, 1)}; + } + GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length]; + for (int i = 0; i < ret.length; i++) { + ret[i] = new GenericInputSplit(i, ret.length); + } + return ret; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + /** + * A builder used to set parameters to the output format's configuration in a fluent way. + * @return builder + */ + public static JDBCInputFormatBuilder buildJDBCInputFormat() { + return new JDBCInputFormatBuilder(); + } + + public static class JDBCInputFormatBuilder { + private final JDBCInputFormat format; + + public JDBCInputFormatBuilder() { + this.format = new JDBCInputFormat(); + //using TYPE_FORWARD_ONLY for high performance reads + this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY; + this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; + } + + public JDBCInputFormatBuilder setUsername(String username) { + format.username = username; + return this; + } + + public JDBCInputFormatBuilder setPassword(String password) { + format.password = password; + return this; + } + + public JDBCInputFormatBuilder setDrivername(String drivername) { + format.drivername = drivername; + return this; + } + + public JDBCInputFormatBuilder setDBUrl(String dbURL) { + format.dbURL = dbURL; + return this; + } + + public JDBCInputFormatBuilder setQuery(String query) { + format.queryTemplate = query; + return this; + } + + public JDBCInputFormatBuilder setResultSetType(int resultSetType) { + format.resultSetType = resultSetType; + return this; + } + + public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) { + format.resultSetConcurrency = resultSetConcurrency; + return this; + } + + public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) { + format.parameterValues = parameterValuesProvider.getParameterValues(); + return this; + } + + public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) { + format.rowTypeInfo = rowTypeInfo; + return this; + } + + public JDBCInputFormat finish() { + if (format.username == null) { + LOG.info("Username was not supplied separately."); + } + if (format.password == null) { + LOG.info("Password was not supplied separately."); + } + if (format.dbURL == null) { + throw new IllegalArgumentException("No database URL supplied"); + } + if (format.queryTemplate == null) { + throw new IllegalArgumentException("No query supplied"); + } + if (format.drivername == null) { + throw new IllegalArgumentException("No driver supplied"); + } + if (format.rowTypeInfo == null) { + throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied"); + } + if (format.parameterValues == null) { + LOG.debug("No input splitting configured (data will be read with parallelism 1)."); + } + return format; + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java new file mode 100644 index 0000000..da4b1ad --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.table.Row; +import org.apache.flink.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OutputFormat to write tuples into a database. + * The OutputFormat has to be configured using the supplied OutputFormatBuilder. + * + * @see Tuple + * @see DriverManager + */ +public class JDBCOutputFormat extends RichOutputFormat<Row> { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); + + private String username; + private String password; + private String drivername; + private String dbURL; + private String query; + private int batchInterval = 5000; + + private Connection dbConn; + private PreparedStatement upload; + + private int batchCount = 0; + + public int[] typesArray; + + public JDBCOutputFormat() { + } + + @Override + public void configure(Configuration parameters) { + } + + /** + * Connects to the target database and initializes the prepared statement. + * + * @param taskNumber The number of the parallel instance. + * @throws IOException Thrown, if the output could not be opened due to an + * I/O problem. + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + establishConnection(); + upload = dbConn.prepareStatement(query); + } catch (SQLException sqe) { + throw new IllegalArgumentException("open() failed.", sqe); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("JDBC driver class not found.", cnfe); + } + } + + private void establishConnection() throws SQLException, ClassNotFoundException { + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + } + + /** + * Adds a record to the prepared statement. + * <p> + * When this method is called, the output format is guaranteed to be opened. + * </p> + * + * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to + * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) + * + * @param row The records to add to the output. + * @see PreparedStatement + * @throws IOException Thrown, if the records could not be added due to an I/O problem. + */ + @Override + public void writeRecord(Row row) throws IOException { + + if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) { + LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); + } + try { + + if (typesArray == null ) { + // no types provided + for (int index = 0; index < row.productArity(); index++) { + LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index)); + upload.setObject(index + 1, row.productElement(index)); + } + } else { + // types provided + for (int index = 0; index < row.productArity(); index++) { + + if (row.productElement(index) == null) { + upload.setNull(index + 1, typesArray[index]); + } else { + // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html + switch (typesArray[index]) { + case java.sql.Types.NULL: + upload.setNull(index + 1, typesArray[index]); + break; + case java.sql.Types.BOOLEAN: + case java.sql.Types.BIT: + upload.setBoolean(index + 1, (boolean) row.productElement(index)); + break; + case java.sql.Types.CHAR: + case java.sql.Types.NCHAR: + case java.sql.Types.VARCHAR: + case java.sql.Types.LONGVARCHAR: + case java.sql.Types.LONGNVARCHAR: + upload.setString(index + 1, (String) row.productElement(index)); + break; + case java.sql.Types.TINYINT: + upload.setByte(index + 1, (byte) row.productElement(index)); + break; + case java.sql.Types.SMALLINT: + upload.setShort(index + 1, (short) row.productElement(index)); + break; + case java.sql.Types.INTEGER: + upload.setInt(index + 1, (int) row.productElement(index)); + break; + case java.sql.Types.BIGINT: + upload.setLong(index + 1, (long) row.productElement(index)); + break; + case java.sql.Types.REAL: + upload.setFloat(index + 1, (float) row.productElement(index)); + break; + case java.sql.Types.FLOAT: + case java.sql.Types.DOUBLE: + upload.setDouble(index + 1, (double) row.productElement(index)); + break; + case java.sql.Types.DECIMAL: + case java.sql.Types.NUMERIC: + upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index)); + break; + case java.sql.Types.DATE: + upload.setDate(index + 1, (java.sql.Date) row.productElement(index)); + break; + case java.sql.Types.TIME: + upload.setTime(index + 1, (java.sql.Time) row.productElement(index)); + break; + case java.sql.Types.TIMESTAMP: + upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index)); + break; + case java.sql.Types.BINARY: + case java.sql.Types.VARBINARY: + case java.sql.Types.LONGVARBINARY: + upload.setBytes(index + 1, (byte[]) row.productElement(index)); + break; + default: + upload.setObject(index + 1, row.productElement(index)); + LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.", + typesArray[index], index + 1, row.productElement(index)); + // case java.sql.Types.SQLXML + // case java.sql.Types.ARRAY: + // case java.sql.Types.JAVA_OBJECT: + // case java.sql.Types.BLOB: + // case java.sql.Types.CLOB: + // case java.sql.Types.NCLOB: + // case java.sql.Types.DATALINK: + // case java.sql.Types.DISTINCT: + // case java.sql.Types.OTHER: + // case java.sql.Types.REF: + // case java.sql.Types.ROWID: + // case java.sql.Types.STRUC + } + } + } + } + upload.addBatch(); + batchCount++; + if (batchCount >= batchInterval) { + upload.executeBatch(); + batchCount = 0; + } + } catch (SQLException | IllegalArgumentException e) { + throw new IllegalArgumentException("writeRecord() failed", e); + } + } + + /** + * Executes prepared statement and closes all resources of this instance. + * + * @throws IOException Thrown, if the input could not be closed properly. + */ + @Override + public void close() throws IOException { + try { + if (upload != null) { + upload.executeBatch(); + upload.close(); + } + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } finally { + upload = null; + batchCount = 0; + } + + try { + if (dbConn != null) { + dbConn.close(); + } + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } finally { + dbConn = null; + } + } + + public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { + return new JDBCOutputFormatBuilder(); + } + + public static class JDBCOutputFormatBuilder { + private final JDBCOutputFormat format; + + protected JDBCOutputFormatBuilder() { + this.format = new JDBCOutputFormat(); + } + + public JDBCOutputFormatBuilder setUsername(String username) { + format.username = username; + return this; + } + + public JDBCOutputFormatBuilder setPassword(String password) { + format.password = password; + return this; + } + + public JDBCOutputFormatBuilder setDrivername(String drivername) { + format.drivername = drivername; + return this; + } + + public JDBCOutputFormatBuilder setDBUrl(String dbURL) { + format.dbURL = dbURL; + return this; + } + + public JDBCOutputFormatBuilder setQuery(String query) { + format.query = query; + return this; + } + + public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) { + format.batchInterval = batchInterval; + return this; + } + + public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) { + format.typesArray = typesArray; + return this; + } + + /** + * Finalizes the configuration and checks validity. + * + * @return Configured JDBCOutputFormat + */ + public JDBCOutputFormat finish() { + if (format.username == null) { + LOG.info("Username was not supplied separately."); + } + if (format.password == null) { + LOG.info("Password was not supplied separately."); + } + if (format.dbURL == null) { + throw new IllegalArgumentException("No dababase URL supplied."); + } + if (format.query == null) { + throw new IllegalArgumentException("No query suplied"); + } + if (format.drivername == null) { + throw new IllegalArgumentException("No driver supplied"); + } + + return format; + } + } + +}
