[FLINK-1466] Adds HCatInputFormats to read from HCatalog tables.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6acd2e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6acd2e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6acd2e4 Branch: refs/heads/master Commit: a6acd2e47c5a8b31e580f66253cad2966e11d3cf Parents: 3d84970 Author: Fabian Hueske <[email protected]> Authored: Thu Jan 29 10:34:29 2015 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Feb 20 16:10:23 2015 +0100 ---------------------------------------------------------------------- flink-staging/flink-hcatalog/pom.xml | 186 +++++++++ .../flink/hcatalog/HCatInputFormatBase.java | 413 +++++++++++++++++++ .../flink/hcatalog/java/HCatInputFormat.java | 140 +++++++ .../flink/hcatalog/scala/HCatInputFormat.scala | 210 ++++++++++ flink-staging/pom.xml | 1 + 5 files changed, 950 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/pom.xml b/flink-staging/flink-hcatalog/pom.xml new file mode 100644 index 0000000..8762b9c --- /dev/null +++ b/flink-staging/flink-hcatalog/pom.xml @@ -0,0 +1,186 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-staging</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-hcatalog</artifactId> + <name>flink-hcatalog</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hcatalog-core</artifactId> + <version>0.12.0</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java new file mode 100644 index 0000000..59a6719 --- /dev/null +++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hcatalog; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.WritableTypeInfo; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * A InputFormat to read from HCatalog tables. + * The InputFormat supports projection (selection and order of fields) and partition filters. + * + * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. + * Flink Tuples are only supported for primitive type fields + * (no STRUCT, ARRAY, or MAP data types) and have a size limitation. + * + * @param <T> + */ +public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInputSplit>, ResultTypeQueryable<T> { + + private static final long serialVersionUID = 1L; + + private Configuration configuration; + + private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat; + private RecordReader<WritableComparable, HCatRecord> recordReader; + private boolean fetched = false; + private boolean hasNext; + + protected String[] fieldNames = new String[0]; + protected HCatSchema outputSchema; + + private TypeInformation<T> resultType; + + public HCatInputFormatBase() { } + + /** + * Creates a HCatInputFormat for the given database and table. + * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. + * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple} by calling + * {@link HCatInputFormatBase#asFlinkTuples()}. + * + * @param database The name of the database to read from. + * @param table The name of the table to read. + * @throws java.io.IOException + */ + public HCatInputFormatBase(String database, String table) throws IOException { + this(database, table, new Configuration()); + } + + /** + * Creates a HCatInputFormat for the given database, table, and + * {@link org.apache.hadoop.conf.Configuration}. + * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. + * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple} by calling + * {@link HCatInputFormatBase#asFlinkTuples()}. + * + * @param database The name of the database to read from. + * @param table The name of the table to read. + * @param config The Configuration for the InputFormat. + * @throws java.io.IOException + */ + public HCatInputFormatBase(String database, String table, Configuration config) throws IOException { + super(); + this.configuration = config; + HadoopUtils.mergeHadoopConf(this.configuration); + + this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table); + this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration); + + // configure output schema of HCatFormat + configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); + // set type information + this.resultType = new WritableTypeInfo(DefaultHCatRecord.class); + } + + /** + * Specifies the fields which are returned by the InputFormat and their order. + * + * @param fields The fields and their order which are returned by the InputFormat. + * @return This InputFormat with specified return fields. + * @throws java.io.IOException + */ + public HCatInputFormatBase<T> getFields(String... fields) throws IOException { + + // build output schema + ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length); + for(String field : fields) { + fieldSchemas.add(this.outputSchema.get(field)); + } + this.outputSchema = new HCatSchema(fieldSchemas); + + // update output schema configuration + configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); + + return this; + } + + /** + * Specifies a SQL-like filter condition on the table's partition columns. + * Filter conditions on non-partition columns are invalid. + * A partition filter can significantly reduce the amount of data to be read. + * + * @param filter A SQL-like filter condition on the table's partition columns. + * @return This InputFormat with specified partition filter. + * @throws java.io.IOException + */ + public HCatInputFormatBase<T> withFilter(String filter) throws IOException { + + // set filter + this.hCatInputFormat.setFilter(filter); + + return this; + } + + /** + * Specifies that the InputFormat returns Flink {@link org.apache.flink.api.java.tuple.Tuple} + * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}. + * At the moment, the following restrictions apply for returning Flink tuples: + * + * <ul> + * <li>Only primitive type fields can be returned in Flink Tuples + * (no STRUCT, MAP, ARRAY data types).</li> + * <li>Only a limited number of fields can be returned as Flink Tuple.</li> + * </ul> + * + * @return This InputFormat. + * @throws org.apache.hive.hcatalog.common.HCatException + */ + public HCatInputFormatBase<T> asFlinkTuples() throws HCatException { + + // build type information + int numFields = outputSchema.getFields().size(); + if(numFields > this.getMaxFlinkTupleSize()) { + throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+ + " fields can be returned as Flink tuples."); + } + + TypeInformation[] fieldTypes = new TypeInformation[numFields]; + fieldNames = new String[numFields]; + for (String fieldName : outputSchema.getFieldNames()) { + HCatFieldSchema field = outputSchema.get(fieldName); + + int fieldPos = outputSchema.getPosition(fieldName); + TypeInformation fieldType = getFieldType(field); + + fieldTypes[fieldPos] = fieldType; + fieldNames[fieldPos] = fieldName; + + } + this.resultType = new TupleTypeInfo(fieldTypes); + + return this; + } + + protected abstract int getMaxFlinkTupleSize(); + + private TypeInformation getFieldType(HCatFieldSchema fieldSchema) { + + switch(fieldSchema.getType()) { + case INT: + return BasicTypeInfo.INT_TYPE_INFO; + case TINYINT: + return BasicTypeInfo.BYTE_TYPE_INFO; + case SMALLINT: + return BasicTypeInfo.SHORT_TYPE_INFO; + case BIGINT: + return BasicTypeInfo.LONG_TYPE_INFO; + case BOOLEAN: + return BasicTypeInfo.BOOLEAN_TYPE_INFO; + case FLOAT: + return BasicTypeInfo.FLOAT_TYPE_INFO; + case DOUBLE: + return BasicTypeInfo.DOUBLE_TYPE_INFO; + case STRING: + return BasicTypeInfo.STRING_TYPE_INFO; + case BINARY: + return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; + case ARRAY: + throw new UnsupportedOperationException("ARRAY type is not supported in Flink tuples, yet."); + case MAP: + throw new UnsupportedOperationException("MAP type is not supported in Flink tuples, yet."); + case STRUCT: + throw new UnsupportedOperationException("STRUCT type not supported in Flink tuples, yet."); + default: + throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered."); + } + } + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat. + * + * @return The Configuration of the HCatInputFormat. + */ + public Configuration getConfiguration() { + return this.configuration; + } + + /** + * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord} + * returned by this InputFormat. + * + * @return The HCatSchema of the HCatRecords returned by this InputFormat. + */ + public HCatSchema getOutputSchema() { + return this.outputSchema; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(org.apache.flink.configuration.Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // no statistics provided at the moment + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + List<InputSplit> splits; + try { + splits = this.hCatInputFormat.getSplits(jobContext); + } catch (InterruptedException e) { + throw new IOException("Could not get Splits.", e); + } + HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; + + for(int i = 0; i < hadoopInputSplits.length; i++){ + hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); + } + return hadoopInputSplits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + TaskAttemptContext context = null; + try { + context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); + } catch(Exception e) { + throw new RuntimeException(e); + } + + try { + this.recordReader = this.hCatInputFormat + .createRecordReader(split.getHadoopInputSplit(), context); + this.recordReader.initialize(split.getHadoopInputSplit(), context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordReader.", e); + } finally { + this.fetched = false; + } + } + + @Override + public boolean reachedEnd() throws IOException { + if(!this.fetched) { + fetchNext(); + } + return !this.hasNext; + } + + private void fetchNext() throws IOException { + try { + this.hasNext = this.recordReader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException("Could not fetch next KeyValue pair.", e); + } finally { + this.fetched = true; + } + } + + @Override + public T nextRecord(T record) throws IOException { + if(!this.fetched) { + // first record + fetchNext(); + } + if(!this.hasNext) { + return null; + } + try { + + // get next HCatRecord + HCatRecord v = this.recordReader.getCurrentValue(); + this.fetched = false; + + if(this.fieldNames.length > 0) { + // return as Flink tuple + return this.buildFlinkTuple(record, v); + + } else { + // return as HCatRecord + return (T)v; + } + + } catch (InterruptedException e) { + throw new IOException("Could not get next record.", e); + } + } + + protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException; + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Custom de/serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeInt(this.fieldNames.length); + for(String fieldName : this.fieldNames) { + out.writeUTF(fieldName); + } + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + this.fieldNames = new String[in.readInt()]; + for(int i=0; i<this.fieldNames.length; i++) { + this.fieldNames[i] = in.readUTF(); + } + + Configuration configuration = new Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat(); + this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema")); + } + + // -------------------------------------------------------------------------------------------- + // Result type business + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation<T> getProducedType() { + return this.resultType; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java new file mode 100644 index 0000000..c3c3a1c --- /dev/null +++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hcatalog.java; + + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.hcatalog.HCatInputFormatBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.HCatRecord; + +/** + * A InputFormat to read from HCatalog tables. + * The InputFormat supports projection (selection and order of fields) and partition filters. + * + * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. + * Flink Tuples are only supported for up to 25 fields of primitive types + * (no STRUCT, ARRAY, or MAP data types). + * + * @param <T> + */ +public class HCatInputFormat<T> extends HCatInputFormatBase<T> { + private static final long serialVersionUID = 1L; + + public HCatInputFormat() {} + + public HCatInputFormat(String database, String table) throws Exception { + super(database, table); + } + + public HCatInputFormat(String database, String table, Configuration config) throws Exception { + super(database, table, config); + } + + + @Override + protected int getMaxFlinkTupleSize() { + return 25; + } + + @Override + protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException { + + Tuple tuple = (Tuple)t; + + // Extract all fields from HCatRecord + for(int i=0; i < this.fieldNames.length; i++) { + + // get field value + Object o = record.get(this.fieldNames[i], this.outputSchema); + + // Set field value in Flink tuple. + // Partition columns are returned as String and + // need to be converted to original type. + switch(this.outputSchema.get(i).getType()) { + case INT: + if(o instanceof String) { + tuple.setField(Integer.parseInt((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case TINYINT: + if(o instanceof String) { + tuple.setField(Byte.parseByte((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case SMALLINT: + if(o instanceof String) { + tuple.setField(Short.parseShort((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case BIGINT: + if(o instanceof String) { + tuple.setField(Long.parseLong((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case BOOLEAN: + if(o instanceof String) { + tuple.setField(Boolean.parseBoolean((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case FLOAT: + if(o instanceof String) { + tuple.setField(Float.parseFloat((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case DOUBLE: + if(o instanceof String) { + tuple.setField(Double.parseDouble((String) o), i); + } else { + tuple.setField(o, i); + } + break; + case STRING: + tuple.setField(o, i); + break; + case BINARY: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type BINARY."); + } else { + tuple.setField(o, i); + } + break; + default: + throw new RuntimeException("Invalid Type"); + } + } + + return (T)tuple; + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala new file mode 100644 index 0000000..7cc18f0 --- /dev/null +++ b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hcatalog.scala + +import org.apache.flink.configuration +import org.apache.flink.hcatalog.HCatInputFormatBase +import org.apache.hadoop.conf.Configuration +import org.apache.hive.hcatalog.data.HCatRecord +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema + +/** + * A InputFormat to read from HCatalog tables. + * The InputFormat supports projection (selection and order of fields) and partition filters. + * + * Data can be returned as {@link HCatRecord} or + * Flink {@link org.apache.flink.api.java.tuple.Tuple}. + * Flink Tuples are only supported for up to 22 fields of primitive types + * (no STRUCT, ARRAY, or MAP data types). + * + */ +class HCatInputFormat[T]( + database: String, + table: String, + config: Configuration + ) extends HCatInputFormatBase[T](database, table, config) { + + def this(database: String, table: String) { + this(database, table, new Configuration) + } + + var vals: Array[Any] = Array[Any]() + + override def configure(parameters: configuration.Configuration): Unit = { + super.configure(parameters) + vals = new Array[Any](fieldNames.length) + } + + override protected def getMaxFlinkTupleSize: Int = 22 + + override protected def buildFlinkTuple(t: T, record: HCatRecord): T = { + + // Extract all fields from HCatRecord + var i: Int = 0 + while (i < this.fieldNames.length) { + + val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema) + + // partition columns are returned as String + // Check and convert to actual type. + this.outputSchema.get(i).getType match { + case HCatFieldSchema.Type.INT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toInt + } + else { + vals(i) = o.asInstanceOf[Int] + } + case HCatFieldSchema.Type.TINYINT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toInt.toByte + } + else { + vals(i) = o.asInstanceOf[Byte] + } + case HCatFieldSchema.Type.SMALLINT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toInt.toShort + } + else { + vals(i) = o.asInstanceOf[Short] + } + case HCatFieldSchema.Type.BIGINT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toLong + } + else { + vals(i) = o.asInstanceOf[Long] + } + case HCatFieldSchema.Type.BOOLEAN => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toBoolean + } + else { + vals(i) = o.asInstanceOf[Boolean] + } + case HCatFieldSchema.Type.FLOAT => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toFloat + } + else { + vals(i) = o.asInstanceOf[Float] + } + case HCatFieldSchema.Type.DOUBLE => + if (o.isInstanceOf[String]) { + vals(i) = o.asInstanceOf[String].toDouble + } + else { + vals(i) = o.asInstanceOf[Double] + } + case HCatFieldSchema.Type.STRING => + vals(i) = o + case HCatFieldSchema.Type.BINARY => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type BINARY.") + } + else { + vals(i) = o.asInstanceOf[Array[Byte]] + } + case _ => + throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType + + " encountered.") + } + + i += 1 + } + createScalaTuple(vals) + } + + private def createScalaTuple(vals: Array[Any]): T = { + + this.fieldNames.length match { + case 1 => + new Tuple1(vals(0)).asInstanceOf[T] + case 2 => + new Tuple2(vals(0), vals(1)).asInstanceOf[T] + case 3 => + new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T] + case 4 => + new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T] + case 5 => + new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T] + case 6 => + new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T] + case 7 => + new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T] + case 8 => + new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7)) + .asInstanceOf[T] + case 9 => + new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8)).asInstanceOf[T] + case 10 => + new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9)).asInstanceOf[T] + case 11 => + new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10)).asInstanceOf[T] + case 12 => + new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T] + case 13 => + new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T] + case 14 => + new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T] + case 15 => + new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T] + case 16 => + new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15)) + .asInstanceOf[T] + case 17 => + new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16)).asInstanceOf[T] + case 18 => + new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17)).asInstanceOf[T] + case 19 => + new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18)).asInstanceOf[T] + case 20 => + new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T] + case 21 => + new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T] + case 22 => + new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), + vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), + vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T] + case _ => + throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.") + + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml index ea59e70..1b98cbe 100644 --- a/flink-staging/pom.xml +++ b/flink-staging/pom.xml @@ -41,6 +41,7 @@ under the License. <module>flink-streaming</module> <module>flink-hbase</module> <module>flink-gelly</module> + <module>flink-hcatalog</module> </modules> <!-- See main pom.xml for explanation of profiles -->
