[FLINK-1466] Adds HCatInputFormats to read from HCatalog tables.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6acd2e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6acd2e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6acd2e4

Branch: refs/heads/master
Commit: a6acd2e47c5a8b31e580f66253cad2966e11d3cf
Parents: 3d84970
Author: Fabian Hueske <[email protected]>
Authored: Thu Jan 29 10:34:29 2015 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Fri Feb 20 16:10:23 2015 +0100

----------------------------------------------------------------------
 flink-staging/flink-hcatalog/pom.xml            | 186 +++++++++
 .../flink/hcatalog/HCatInputFormatBase.java     | 413 +++++++++++++++++++
 .../flink/hcatalog/java/HCatInputFormat.java    | 140 +++++++
 .../flink/hcatalog/scala/HCatInputFormat.scala  | 210 ++++++++++
 flink-staging/pom.xml                           |   1 +
 5 files changed, 950 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/pom.xml 
b/flink-staging/flink-hcatalog/pom.xml
new file mode 100644
index 0000000..8762b9c
--- /dev/null
+++ b/flink-staging/flink-hcatalog/pom.xml
@@ -0,0 +1,186 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+       
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-staging</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-hcatalog</artifactId>
+       <name>flink-hcatalog</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hive.hcatalog</groupId>
+                       <artifactId>hcatalog-core</artifactId>
+                       <version>0.12.0</version>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                                       <compilerPlugins>
+                                               <compilerPlugin>
+                                                       
<groupId>org.scalamacros</groupId>
+                                                       
<artifactId>paradise_${scala.version}</artifactId>
+                                                       
<version>${scala.macros.version}</version>
+                                               </compilerPlugin>
+                                       </compilerPlugins>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <version>0.5.0</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <verbose>false</verbose>
+                                       <failOnViolation>true</failOnViolation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       <failOnWarning>false</failOnWarning>
+                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                               </configuration>
+                       </plugin>
+
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
new file mode 100644
index 0000000..59a6719
--- /dev/null
+++ 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and 
partition filters.
+ *
+ * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or 
Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for primitive type fields
+ * (no STRUCT, ARRAY, or MAP data types) and have a size limitation.
+ *
+ * @param <T>
+ */
+public abstract class HCatInputFormatBase<T> implements InputFormat<T, 
HadoopInputSplit>, ResultTypeQueryable<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Configuration configuration;
+
+       private org.apache.hive.hcatalog.mapreduce.HCatInputFormat 
hCatInputFormat;
+       private RecordReader<WritableComparable, HCatRecord> recordReader;
+       private boolean fetched = false;
+       private boolean hasNext;
+
+       protected String[] fieldNames = new String[0];
+       protected HCatSchema outputSchema;
+
+       private TypeInformation<T> resultType;
+
+       public HCatInputFormatBase() { }
+
+       /**
+        * Creates a HCatInputFormat for the given database and table.
+        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
+        * The return type of the InputFormat can be changed to Flink {@link 
org.apache.flink.api.java.tuple.Tuple} by calling
+        * {@link HCatInputFormatBase#asFlinkTuples()}.
+        *
+        * @param database The name of the database to read from.
+        * @param table The name of the table to read.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase(String database, String table) throws 
IOException {
+               this(database, table, new Configuration());
+       }
+
+       /**
+        * Creates a HCatInputFormat for the given database, table, and
+        * {@link org.apache.hadoop.conf.Configuration}.
+        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
+        * The return type of the InputFormat can be changed to Flink {@link 
org.apache.flink.api.java.tuple.Tuple} by calling
+        * {@link HCatInputFormatBase#asFlinkTuples()}.
+        *
+        * @param database The name of the database to read from.
+        * @param table The name of the table to read.
+        * @param config The Configuration for the InputFormat.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase(String database, String table, Configuration 
config) throws IOException {
+               super();
+               this.configuration = config;
+               HadoopUtils.mergeHadoopConf(this.configuration);
+
+               this.hCatInputFormat = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, 
database, table);
+               this.outputSchema = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
+
+               // configure output schema of HCatFormat
+               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
+               // set type information
+               this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
+       }
+
+       /**
+        * Specifies the fields which are returned by the InputFormat and their 
order.
+        *
+        * @param fields The fields and their order which are returned by the 
InputFormat.
+        * @return This InputFormat with specified return fields.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase<T> getFields(String... fields) throws 
IOException {
+
+               // build output schema
+               ArrayList<HCatFieldSchema> fieldSchemas = new 
ArrayList<HCatFieldSchema>(fields.length);
+               for(String field : fields) {
+                       fieldSchemas.add(this.outputSchema.get(field));
+               }
+               this.outputSchema = new HCatSchema(fieldSchemas);
+
+               // update output schema configuration
+               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
+
+               return this;
+       }
+
+       /**
+        * Specifies a SQL-like filter condition on the table's partition 
columns.
+        * Filter conditions on non-partition columns are invalid.
+        * A partition filter can significantly reduce the amount of data to be 
read.
+        *
+        * @param filter A SQL-like filter condition on the table's partition 
columns.
+        * @return This InputFormat with specified partition filter.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase<T> withFilter(String filter) throws 
IOException {
+
+               // set filter
+               this.hCatInputFormat.setFilter(filter);
+
+               return this;
+       }
+
+       /**
+        * Specifies that the InputFormat returns Flink {@link 
org.apache.flink.api.java.tuple.Tuple}
+        * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}.
+        * At the moment, the following restrictions apply for returning Flink 
tuples:
+        *
+        * <ul>
+        *     <li>Only primitive type fields can be returned in Flink Tuples
+        *          (no STRUCT, MAP, ARRAY data types).</li>
+        *     <li>Only a limited number of fields can be returned as Flink 
Tuple.</li>
+        * </ul>
+        *
+        * @return This InputFormat.
+        * @throws org.apache.hive.hcatalog.common.HCatException
+        */
+       public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
+
+               // build type information
+               int numFields = outputSchema.getFields().size();
+               if(numFields > this.getMaxFlinkTupleSize()) {
+                       throw new IllegalArgumentException("Only up to 
"+this.getMaxFlinkTupleSize()+
+                                       " fields can be returned as Flink 
tuples.");
+               }
+
+               TypeInformation[] fieldTypes = new TypeInformation[numFields];
+               fieldNames = new String[numFields];
+               for (String fieldName : outputSchema.getFieldNames()) {
+                       HCatFieldSchema field = outputSchema.get(fieldName);
+
+                       int fieldPos = outputSchema.getPosition(fieldName);
+                       TypeInformation fieldType = getFieldType(field);
+
+                       fieldTypes[fieldPos] = fieldType;
+                       fieldNames[fieldPos] = fieldName;
+
+               }
+               this.resultType = new TupleTypeInfo(fieldTypes);
+
+               return this;
+       }
+
+       protected abstract int getMaxFlinkTupleSize();
+
+       private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
+
+               switch(fieldSchema.getType()) {
+                       case INT:
+                               return BasicTypeInfo.INT_TYPE_INFO;
+                       case TINYINT:
+                               return BasicTypeInfo.BYTE_TYPE_INFO;
+                       case SMALLINT:
+                               return BasicTypeInfo.SHORT_TYPE_INFO;
+                       case BIGINT:
+                               return BasicTypeInfo.LONG_TYPE_INFO;
+                       case BOOLEAN:
+                               return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+                       case FLOAT:
+                               return BasicTypeInfo.FLOAT_TYPE_INFO;
+                       case DOUBLE:
+                               return BasicTypeInfo.DOUBLE_TYPE_INFO;
+                       case STRING:
+                               return BasicTypeInfo.STRING_TYPE_INFO;
+                       case BINARY:
+                               return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+                       case ARRAY:
+                               throw new UnsupportedOperationException("ARRAY 
type is not supported in Flink tuples, yet.");
+                       case MAP:
+                               throw new UnsupportedOperationException("MAP 
type is not supported in Flink tuples, yet.");
+                       case STRUCT:
+                               throw new UnsupportedOperationException("STRUCT 
type not supported in Flink tuples, yet.");
+                       default:
+                               throw new IllegalArgumentException("Unknown 
data type \""+fieldSchema.getType()+"\" encountered.");
+               }
+       }
+
+       /**
+        * Returns the {@link org.apache.hadoop.conf.Configuration} of the 
HCatInputFormat.
+        *
+        * @return The Configuration of the HCatInputFormat.
+        */
+       public Configuration getConfiguration() {
+               return this.configuration;
+       }
+
+       /**
+        * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} 
of the {@link org.apache.hive.hcatalog.data.HCatRecord}
+        * returned by this InputFormat.
+        *
+        * @return The HCatSchema of the HCatRecords returned by this 
InputFormat.
+        */
+       public HCatSchema getOutputSchema() {
+               return this.outputSchema;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  InputFormat
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void configure(org.apache.flink.configuration.Configuration 
parameters) {
+               // nothing to do
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
+               // no statistics provided at the moment
+               return null;
+       }
+
+       @Override
+       public HadoopInputSplit[] createInputSplits(int minNumSplits)
+                       throws IOException {
+               
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", 
minNumSplits);
+
+               JobContext jobContext = null;
+               try {
+                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, new JobID());
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               List<InputSplit> splits;
+               try {
+                       splits = this.hCatInputFormat.getSplits(jobContext);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not get Splits.", e);
+               }
+               HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
+
+               for(int i = 0; i < hadoopInputSplits.length; i++){
+                       hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
+               }
+               return hadoopInputSplits;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
+               return new LocatableInputSplitAssigner(inputSplits);
+       }
+
+       @Override
+       public void open(HadoopInputSplit split) throws IOException {
+               TaskAttemptContext context = null;
+               try {
+                       context = 
HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+               } catch(Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               try {
+                       this.recordReader = this.hCatInputFormat
+                                       
.createRecordReader(split.getHadoopInputSplit(), context);
+                       
this.recordReader.initialize(split.getHadoopInputSplit(), context);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not create RecordReader.", 
e);
+               } finally {
+                       this.fetched = false;
+               }
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               if(!this.fetched) {
+                       fetchNext();
+               }
+               return !this.hasNext;
+       }
+
+       private void fetchNext() throws IOException {
+               try {
+                       this.hasNext = this.recordReader.nextKeyValue();
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not fetch next KeyValue 
pair.", e);
+               } finally {
+                       this.fetched = true;
+               }
+       }
+
+       @Override
+       public T nextRecord(T record) throws IOException {
+               if(!this.fetched) {
+                       // first record
+                       fetchNext();
+               }
+               if(!this.hasNext) {
+                       return null;
+               }
+               try {
+
+                       // get next HCatRecord
+                       HCatRecord v = this.recordReader.getCurrentValue();
+                       this.fetched = false;
+
+                       if(this.fieldNames.length > 0) {
+                               // return as Flink tuple
+                               return this.buildFlinkTuple(record, v);
+
+                       } else {
+                               // return as HCatRecord
+                               return (T)v;
+                       }
+
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not get next record.", e);
+               }
+       }
+
+       protected abstract T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException;
+
+       @Override
+       public void close() throws IOException {
+               this.recordReader.close();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom de/serialization methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeInt(this.fieldNames.length);
+               for(String fieldName : this.fieldNames) {
+                       out.writeUTF(fieldName);
+               }
+               this.configuration.write(out);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               this.fieldNames = new String[in.readInt()];
+               for(int i=0; i<this.fieldNames.length; i++) {
+                       this.fieldNames[i] = in.readUTF();
+               }
+
+               Configuration configuration = new Configuration();
+               configuration.readFields(in);
+
+               if(this.configuration == null) {
+                       this.configuration = configuration;
+               }
+
+               this.hCatInputFormat = new 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
+               this.outputSchema = 
(HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Result type business
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return this.resultType;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
new file mode 100644
index 0000000..c3c3a1c
--- /dev/null
+++ 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.java;
+
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.hcatalog.HCatInputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and 
partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or Flink {@link 
org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for up to 25 fields of primitive types
+ * (no STRUCT, ARRAY, or MAP data types).
+ *
+ * @param <T>
+ */
+public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
+       private static final long serialVersionUID = 1L;
+
+       public HCatInputFormat() {}
+
+       public HCatInputFormat(String database, String table) throws Exception {
+               super(database, table);
+       }
+
+       public HCatInputFormat(String database, String table, Configuration 
config) throws Exception {
+               super(database, table, config);
+       }
+
+
+       @Override
+       protected int getMaxFlinkTupleSize() {
+               return 25;
+       }
+
+       @Override
+       protected T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException {
+
+               Tuple tuple = (Tuple)t;
+
+               // Extract all fields from HCatRecord
+               for(int i=0; i < this.fieldNames.length; i++) {
+
+                       // get field value
+                       Object o = record.get(this.fieldNames[i], 
this.outputSchema);
+
+                       // Set field value in Flink tuple.
+                       // Partition columns are returned as String and
+                       //   need to be converted to original type.
+                       switch(this.outputSchema.get(i).getType()) {
+                               case INT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Integer.parseInt((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case TINYINT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Byte.parseByte((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case SMALLINT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Short.parseShort((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case BIGINT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Long.parseLong((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case BOOLEAN:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Boolean.parseBoolean((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case FLOAT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Float.parseFloat((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case DOUBLE:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Double.parseDouble((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case STRING:
+                                       tuple.setField(o, i);
+                                       break;
+                               case BINARY:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type BINARY.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               default:
+                                       throw new RuntimeException("Invalid 
Type");
+                       }
+               }
+
+               return (T)tuple;
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
 
b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
new file mode 100644
index 0000000..7cc18f0
--- /dev/null
+++ 
b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.scala
+
+import org.apache.flink.configuration
+import org.apache.flink.hcatalog.HCatInputFormatBase
+import org.apache.hadoop.conf.Configuration
+import org.apache.hive.hcatalog.data.HCatRecord
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and 
partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or
+ * Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for up to 22 fields of primitive types
+ * (no STRUCT, ARRAY, or MAP data types).
+ *
+ */
+class HCatInputFormat[T](
+                        database: String,
+                        table: String,
+                        config: Configuration
+                          ) extends HCatInputFormatBase[T](database, table, 
config) {
+
+  def this(database: String, table: String) {
+    this(database, table, new Configuration)
+  }
+
+  var vals: Array[Any] = Array[Any]()
+
+  override def configure(parameters: configuration.Configuration): Unit = {
+    super.configure(parameters)
+    vals = new Array[Any](fieldNames.length)
+  }
+
+  override protected def getMaxFlinkTupleSize: Int = 22
+
+  override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
+
+    // Extract all fields from HCatRecord
+    var i: Int = 0
+    while (i < this.fieldNames.length) {
+
+        val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
+
+        // partition columns are returned as String
+        //   Check and convert to actual type.
+        this.outputSchema.get(i).getType match {
+          case HCatFieldSchema.Type.INT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt
+            }
+            else {
+              vals(i) = o.asInstanceOf[Int]
+            }
+          case HCatFieldSchema.Type.TINYINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toByte
+            }
+            else {
+              vals(i) = o.asInstanceOf[Byte]
+            }
+          case HCatFieldSchema.Type.SMALLINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toShort
+            }
+            else {
+              vals(i) = o.asInstanceOf[Short]
+            }
+          case HCatFieldSchema.Type.BIGINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toLong
+            }
+            else {
+              vals(i) = o.asInstanceOf[Long]
+            }
+          case HCatFieldSchema.Type.BOOLEAN =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toBoolean
+            }
+            else {
+              vals(i) = o.asInstanceOf[Boolean]
+            }
+          case HCatFieldSchema.Type.FLOAT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toFloat
+            }
+            else {
+              vals(i) = o.asInstanceOf[Float]
+            }
+          case HCatFieldSchema.Type.DOUBLE =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toDouble
+            }
+            else {
+              vals(i) = o.asInstanceOf[Double]
+            }
+          case HCatFieldSchema.Type.STRING =>
+            vals(i) = o
+          case HCatFieldSchema.Type.BINARY =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
BINARY.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[Array[Byte]]
+            }
+          case _ =>
+            throw new RuntimeException("Invalid type " + 
this.outputSchema.get(i).getType +
+              " encountered.")
+        }
+
+        i += 1
+      }
+    createScalaTuple(vals)
+  }
+
+  private def createScalaTuple(vals: Array[Any]): T = {
+
+    this.fieldNames.length match {
+      case 1 =>
+        new Tuple1(vals(0)).asInstanceOf[T]
+      case 2 =>
+        new Tuple2(vals(0), vals(1)).asInstanceOf[T]
+      case 3 =>
+        new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
+      case 4 =>
+        new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
+      case 5 =>
+        new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
+      case 6 =>
+        new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), 
vals(5)).asInstanceOf[T]
+      case 7 =>
+        new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6)).asInstanceOf[T]
+      case 8 =>
+        new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7))
+          .asInstanceOf[T]
+      case 9 =>
+        new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8)).asInstanceOf[T]
+      case 10 =>
+        new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9)).asInstanceOf[T]
+      case 11 =>
+        new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10)).asInstanceOf[T]
+      case 12 =>
+        new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
+      case 13 =>
+        new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
+      case 14 =>
+        new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), 
vals(13)).asInstanceOf[T]
+      case 15 =>
+        new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), 
vals(14)).asInstanceOf[T]
+      case 16 =>
+        new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15))
+          .asInstanceOf[T]
+      case 17 =>
+        new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16)).asInstanceOf[T]
+      case 18 =>
+        new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17)).asInstanceOf[T]
+      case 19 =>
+        new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18)).asInstanceOf[T]
+      case 20 =>
+        new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
+      case 21 =>
+        new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
+      case 22 =>
+        new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20), 
vals(21)).asInstanceOf[T]
+      case _ =>
+        throw new RuntimeException("Only up to 22 fields supported for Scala 
Tuples.")
+
+  }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index ea59e70..1b98cbe 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -41,6 +41,7 @@ under the License.
                <module>flink-streaming</module>
                <module>flink-hbase</module>
                <module>flink-gelly</module>
+               <module>flink-hcatalog</module>
        </modules>
        
        <!-- See main pom.xml for explanation of profiles -->

Reply via email to