CRUNCH-340: added HCatSource & HCatTarget

Signed-off-by: Josh Wills <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5609b014
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5609b014
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5609b014

Branch: refs/heads/master
Commit: 5609b014378d3460a55ce25522f0c00659872807
Parents: 2469348
Author: Stephen Durfey <[email protected]>
Authored: Mon Dec 4 10:49:59 2017 -0600
Committer: Josh Wills <[email protected]>
Committed: Sun Dec 10 08:27:13 2017 -0800

----------------------------------------------------------------------
 crunch-hcatalog/pom.xml                         | 153 +++++++++
 .../crunch/io/hcatalog/HCatSourceITSpec.java    | 322 ++++++++++++++++++
 .../crunch/io/hcatalog/HCatTargetITSpec.java    | 308 +++++++++++++++++
 .../crunch/io/hcatalog/HCatTestSuiteIT.java     | 120 +++++++
 .../crunch/io/hcatalog/HCatTestUtils.java       | 234 +++++++++++++
 .../src/it/resources/log4j.properties           |  30 ++
 .../org/apache/crunch/io/hcatalog/FromHCat.java |  85 +++++
 .../io/hcatalog/HCatRecordDataIterable.java     | 124 +++++++
 .../io/hcatalog/HCatRecordDataReadable.java     |  66 ++++
 .../crunch/io/hcatalog/HCatSourceTarget.java    | 330 +++++++++++++++++++
 .../apache/crunch/io/hcatalog/HCatTarget.java   | 240 ++++++++++++++
 .../org/apache/crunch/io/hcatalog/ToHCat.java   | 123 +++++++
 .../CrunchDefaultOutputCommitterContainer.java  |  63 ++++
 .../CrunchDefaultOutputFormatContainer.java     |  39 +++
 .../CrunchFileOutputCommitterContainer.java     |  92 ++++++
 .../CrunchFileOutputFormatContainer.java        |  52 +++
 .../mapreduce/CrunchHCatOutputFormat.java       |  52 +++
 .../hcatalog/mapreduce/HCatMapRedUtils.java     |  62 ++++
 .../hive/hcatalog/mapreduce/package-info.java   |  50 +++
 pom.xml                                         |  16 +-
 20 files changed, 2560 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hcatalog/pom.xml b/crunch-hcatalog/pom.xml
new file mode 100644
index 0000000..e99814b
--- /dev/null
+++ b/crunch-hcatalog/pom.xml
@@ -0,0 +1,153 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>crunch-hcatalog</artifactId>
+  <name>Apache Crunch HCatalog Support</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+     <groupId>org.apache.hadoop</groupId>
+     <artifactId>hadoop-minicluster</artifactId>
+     <scope>test</scope>
+    </dependency>
+
+    <!-- Used by Hive in integration tests -->
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-shell</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-hbase</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-hbase-handler</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+     </plugins>
+   </build>   
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java
 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java
new file mode 100644
index 0000000..676b1fe
--- /dev/null
+++ 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+public class HCatSourceITSpec extends CrunchTestSupport {
+
+  private static IMetaStoreClient client;
+  private static TemporaryPath temporaryPath;
+  private static Configuration conf;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @BeforeClass
+  public static void setUp() throws Throwable {
+    HCatTestSuiteIT.startTest();
+    client = HCatTestSuiteIT.getClient();
+    temporaryPath = HCatTestSuiteIT.getRootPath();
+    conf = HCatTestSuiteIT.getConf();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    HCatTestSuiteIT.endTest();
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    String tableName = testName.getMethodName();
+    Path tableRootLocation = temporaryPath.getPath(tableName);
+    String data = "17,josh\n29,indiana\n";
+    writeDataToHdfs(data, tableRootLocation, conf);
+    HCatTestUtils.createUnpartitionedTable(client, tableName, 
TableType.MANAGED_TABLE);
+
+    Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
+    HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
+    HCatSchema schema = src.getTableSchema(p.getConfiguration());
+    PCollection<HCatRecord> records = p.read(src);
+    List<Pair<Integer, String>> mat = Lists.newArrayList(
+        records.parallelDo(new HCatTestUtils.Fns.MapPairFn(schema), 
Avros.tableOf(Avros.ints(), Avros.strings()))
+            .materialize());
+    p.done();
+    assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, 
"indiana")), mat);
+  }
+
+  @Test
+  public void testReadable() throws Exception {
+    String tableName = testName.getMethodName();
+    Path tableRootLocation = temporaryPath.getPath(tableName);
+    String data = "17,josh\n29,indiana\n";
+    writeDataToHdfs(data, tableRootLocation, conf);
+    HCatTestUtils.createUnpartitionedTable(client, tableName, 
TableType.MANAGED_TABLE, tableRootLocation);
+
+    Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
+    HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
+    HCatSchema schema = src.getTableSchema(p.getConfiguration());
+    PCollection<HCatRecord> records = p.read(src);
+
+    ReadableData<HCatRecord> readable = records.asReadable(true);
+    TaskInputOutputContext mockTIOC = 
Mockito.mock(TaskInputOutputContext.class);
+    when(mockTIOC.getConfiguration()).thenReturn(conf);
+    readable.configure(conf);
+
+    Iterator<HCatRecord> iterator = readable.read(mockTIOC).iterator();
+    HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema);
+    List<Pair<Integer, String>> results = new ArrayList<>();
+    while (iterator.hasNext()) {
+      results.add(fn.map(iterator.next()));
+    }
+
+    p.done();
+    assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, 
"indiana")), results);
+  }
+
+  @Test
+  public void testmaterialize() throws Exception {
+    String tableName = testName.getMethodName();
+    Path tableRootLocation = temporaryPath.getPath(tableName);
+    String data = "17,josh\n29,indiana\n";
+    writeDataToHdfs(data, tableRootLocation, conf);
+    HCatTestUtils.createUnpartitionedTable(client, tableName, 
TableType.MANAGED_TABLE, tableRootLocation);
+
+    Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
+    HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
+    HCatSchema schema = src.getTableSchema(p.getConfiguration());
+    PCollection<HCatRecord> records = p.read(src);
+
+    // force the materialize here on the HCatRecords themselves ... then
+    // transform
+    Iterable<HCatRecord> materialize = records.materialize();
+    HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema);
+    List<Pair<Integer, String>> results = new ArrayList<>();
+    for (final HCatRecord record : materialize) {
+      results.add(fn.map(record));
+    }
+
+    p.done();
+    assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, 
"indiana")), results);
+  }
+
+  @Test
+  public void testMaterialize_partitionedTable_multiplePartitionsRequested() 
throws Exception {
+    String tableName = testName.getMethodName();
+    Path tableRootLocation = temporaryPath.getPath(tableName);
+    String part1Data = "17,josh\n29,indiana\n";
+    String part1Value = "1234";
+    Path partition1Location = new Path(tableRootLocation, part1Value);
+    String part2Data = "42,jackie\n17,ohio\n";
+    String part2Value = "5678";
+    Path partition2Location = new Path(tableRootLocation, part2Value);
+    writeDataToHdfs(part1Data, partition1Location, conf);
+    writeDataToHdfs(part2Data, partition2Location, conf);
+
+    FieldSchema partitionSchema = new FieldSchema();
+    partitionSchema.setName("timestamp");
+    partitionSchema.setType("string");
+
+    Table table = HCatTestUtils.createTable(client, "default", tableName, 
TableType.EXTERNAL_TABLE, tableRootLocation,
+        Collections.singletonList(partitionSchema));
+    client
+        .add_partition(HCatTestUtils.createPartition(table, 
partition1Location, Collections.singletonList(part1Value)));
+    client
+        .add_partition(HCatTestUtils.createPartition(table, 
partition2Location, Collections.singletonList(part2Value)));
+
+    Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
+    String filter = "timestamp=\"" + part1Value + "\" or timestamp=\"" + 
part2Value + "\"";
+    // HCatSource src = new HCatSource("default", tableName, filter);
+    HCatSourceTarget src = (HCatSourceTarget) FromHCat.table("default", 
tableName, filter);
+
+    HCatSchema schema = src.getTableSchema(p.getConfiguration());
+    PCollection<HCatRecord> records = p.read(src);
+    // force the materialize here on the HCatRecords themselves ... then
+    // transform
+    Iterable<HCatRecord> materialize = records.materialize();
+    HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema);
+    List<Pair<Integer, String>> results = new ArrayList<>();
+    for (final HCatRecord record : materialize) {
+      results.add(fn.map(record));
+    }
+
+    p.done();
+    assertEquals(
+        ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana"), 
Pair.of(42, "jackie"), Pair.of(17, "ohio")),
+        results);
+  }
+
+  @Test
+  public void testGroupBy() throws Exception {
+    String tableName = testName.getMethodName();
+    Path tableRootLocation = temporaryPath.getPath(tableName);
+    String data = "17,josh\n29,indiana\n";
+    writeDataToHdfs(data, tableRootLocation, conf);
+    HCatTestUtils.createUnpartitionedTable(client, tableName, 
TableType.MANAGED_TABLE, tableRootLocation);
+
+    Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf);
+    HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName);
+
+    HCatSchema schema = src.getTableSchema(p.getConfiguration());
+    PCollection<HCatRecord> records = p.read(src);
+    // can't use HCatRecord here as the intermediate output is written out by
+    // hadoop, and there is
+    // an explicit check to ensure that the type being written out matches the
+    // defined output type.
+    // e.g. DefaultHCatRecord != HCatRecord, therefore an exception is thrown
+    PTable<String, DefaultHCatRecord> table = records.parallelDo(new 
HCatTestUtils.Fns.GroupByHCatRecordFn(),
+        Writables.tableOf(Writables.strings(), 
Writables.writables(DefaultHCatRecord.class)));
+
+    PTable<Integer, String> finaltable = table.groupByKey().parallelDo(new 
HCatTestUtils.Fns.HCatRecordMapFn(schema),
+        Avros.tableOf(Avros.ints(), Avros.strings()));
+
+    List<Pair<Integer, String>> results = new ArrayList<>();
+    for (final Map.Entry<Integer, String> entry : 
finaltable.materializeToMap().entrySet()) {
+      results.add(Pair.of(entry.getKey(), entry.getValue()));
+    }
+
+    p.done();
+
+    assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, 
"indiana")), results);
+  }
+
+  @Test
+  public void test_HCatRead_NonNativeTable_HBase() throws Exception {
+    HBaseTestingUtility hbaseTestUtil = null;
+    try {
+      String db = "default";
+      String hiveTable = "test";
+      Configuration hbaseConf = HBaseConfiguration.create(conf);
+      hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
+      hbaseTestUtil.startMiniZKCluster();
+      hbaseTestUtil.startMiniHBaseCluster(1, 1);
+
+      org.apache.hadoop.hbase.client.Table table = 
hbaseTestUtil.createTable(TableName.valueOf("test-table"), "fam");
+
+      String key1 = "this-is-a-key";
+      Put put = new Put(Bytes.toBytes(key1));
+      put.addColumn("fam".getBytes(), "foo".getBytes(), "17".getBytes());
+      table.put(put);
+      String key2 = "this-is-a-key-too";
+      Put put2 = new Put(Bytes.toBytes(key2));
+      put2.addColumn("fam".getBytes(), "foo".getBytes(), "29".getBytes());
+      table.put(put2);
+      table.close();
+
+      org.apache.hadoop.hive.ql.metadata.Table tbl = new 
org.apache.hadoop.hive.ql.metadata.Table(db, hiveTable);
+      tbl.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+      tbl.setTableType(TableType.EXTERNAL_TABLE);
+
+      FieldSchema f1 = new FieldSchema();
+      f1.setName("foo");
+      f1.setType("int");
+      FieldSchema f2 = new FieldSchema();
+      f2.setName("key");
+      f2.setType("string");
+
+      tbl.setProperty("hbase.table.name", "test-table");
+      tbl.setProperty("hbase.mapred.output.outputtable", "test-table");
+      tbl.setProperty("storage_handler", 
"org.apache.hadoop.hive.hbase.HBaseStorageHandler");
+      
tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+      tbl.setFields(ImmutableList.of(f1, f2));
+      tbl.setSerdeParam("hbase.columns.mapping", "fam:foo,:key");
+      this.client.createTable(tbl.getTTable());
+
+      Pipeline p = new MRPipeline(HCatSourceITSpec.class, hbaseConf);
+      HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(hiveTable);
+
+      HCatSchema schema = src.getTableSchema(p.getConfiguration());
+      PCollection<HCatRecord> records = p.read(src);
+      List<Pair<String, Integer>> mat = Lists.newArrayList(
+          records.parallelDo(new HCatTestUtils.Fns.KeyMapPairFn(schema), 
Avros.tableOf(Avros.strings(), Avros.ints()))
+              .materialize());
+
+      p.done();
+
+      assertEquals(ImmutableList.of(Pair.of(key1, 17), Pair.of(key2, 29)), 
mat);
+    } finally {
+      if (hbaseTestUtil != null) {
+        hbaseTestUtil.shutdownMiniHBaseCluster();
+        hbaseTestUtil.shutdownMiniZKCluster();
+      }
+    }
+  }
+
+  // writes data to the specified location and ensures the directory exists
+  // prior to writing
+  private Path writeDataToHdfs(String data, Path location, Configuration conf) 
throws IOException {
+    FileSystem fs = location.getFileSystem(conf);
+    Path writeLocation = new Path(location, UUID.randomUUID().toString());
+    fs.mkdirs(location);
+    fs.create(writeLocation);
+    ByteArrayInputStream baos = new 
ByteArrayInputStream(data.getBytes("UTF-8"));
+    try (FSDataOutputStream fos = fs.create(writeLocation)) {
+      IOUtils.copy(baos, fos);
+    }
+
+    return writeLocation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java
 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java
new file mode 100644
index 0000000..941d2d8
--- /dev/null
+++ 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static junit.framework.Assert.assertEquals;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class HCatTargetITSpec extends CrunchTestSupport {
+
+  private static IMetaStoreClient client;
+  private static Configuration conf;
+  private static TemporaryPath tempDir;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @BeforeClass
+  public static void setUp() throws Throwable {
+    HCatTestSuiteIT.startTest();
+    client = HCatTestSuiteIT.getClient();
+    conf = HCatTestSuiteIT.getConf();
+    tempDir = HCatTestSuiteIT.getRootPath();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    HCatTestSuiteIT.endTest();
+  }
+
+  @Test
+  public void test_successfulWriteToHCatTarget() throws IOException, 
HiveException, TException {
+    String tableName = testName.getMethodName();
+    Path tableRootLocation = tempDir.getPath(tableName);
+    String data = "17,josh\n29,indiana\n";
+    writeDataToHdfs(data, tableRootLocation, conf);
+
+    FieldSchema partitionSchema = new FieldSchema();
+    partitionSchema.setName("timestamp");
+    partitionSchema.setType("string");
+    HCatTestUtils.createTable(client, "default", tableName, 
TableType.EXTERNAL_TABLE, tableRootLocation,
+        Lists.newArrayList(partitionSchema));
+
+    Pipeline pipeline = new MRPipeline(HCatSourceITSpec.class, conf);
+    PCollection<String> contents = 
pipeline.readTextFile(tableRootLocation.toString());
+    PCollection<HCatRecord> hcatRecords = contents.parallelDo(new 
HCatTestUtils.Fns.MapHCatRecordFn(),
+        Writables.writables(HCatRecord.class));
+    Map<String, String> partitions = new HashMap<String, String>() {
+      {
+        {
+          put("timestamp", "1234");
+        }
+      }
+    };
+
+    pipeline.write(hcatRecords, ToHCat.table("default", tableName, 
partitions));
+    pipeline.run();
+
+    // ensure partition was created
+    List<Partition> partitionList = client.listPartitions("default", 
tableName, (short) 5);
+    assertThat(partitionList.size(), is(1));
+    Partition newPartition = 
Iterators.getOnlyElement(partitionList.iterator());
+    assertThat(newPartition.getValuesIterator().next(), is("1234"));
+
+    // read data from table to ensure it was written correctly
+    HCatSourceTarget source = (HCatSourceTarget) FromHCat.table("default", 
tableName, "timestamp='1234'");
+    PCollection<HCatRecord> read = pipeline.read(source);
+    HCatSchema schema = source.getTableSchema(pipeline.getConfiguration());
+    ArrayList<Pair<Integer, String>> mat = Lists.newArrayList(
+        read.parallelDo(new HCatTestUtils.Fns.MapPairFn(schema), 
Avros.tableOf(Avros.ints(), Avros.strings()))
+            .materialize());
+    assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, 
"indiana")), mat);
+    partitions = new HashMap<String, String>() {
+      {
+        {
+          put("timestamp", "5678");
+        }
+      }
+    };
+
+    pipeline.write(read, ToHCat.table("default", tableName, partitions));
+    pipeline.done();
+  }
+
+  @Test
+  public void test_successfulWriteToHCatTarget_GroupByKey() throws 
IOException, HiveException, TException {
+
+    String tableName = testName.getMethodName();
+    Path tableRootLocation = tempDir.getPath(tableName);
+    String data = "17,josh\n29,indiana\n";
+    writeDataToHdfs(data, tableRootLocation, conf);
+
+    FieldSchema partitionSchema = new FieldSchema();
+    partitionSchema.setName("timestamp");
+    partitionSchema.setType("string");
+    HCatTestUtils.createTable(client, "default", tableName, 
TableType.EXTERNAL_TABLE, tableRootLocation,
+        Lists.newArrayList(partitionSchema));
+
+    Pipeline pipeline = new MRPipeline(HCatSourceITSpec.class, conf);
+    PCollection<String> contents = 
pipeline.readTextFile(tableRootLocation.toString());
+    PCollection<HCatRecord> hcatRecords = contents.parallelDo(new 
HCatTestUtils.Fns.MapHCatRecordFn(),
+        Writables.writables(HCatRecord.class));
+    Map<String, String> partitions = new HashMap<String, String>() {
+      {
+        {
+          put("timestamp", "1234");
+        }
+      }
+    };
+
+    HCatTarget target = new HCatTarget(tableName, partitions);
+
+    pipeline.write(hcatRecords, target);
+    pipeline.run();
+
+    // ensure partition was created
+    List<Partition> partitionList = client.listPartitions("default", 
tableName, (short) 5);
+    assertThat(partitionList.size(), is(1));
+    Partition newPartition = 
Iterators.getOnlyElement(partitionList.iterator());
+    assertThat(newPartition.getValuesIterator().next(), is("1234"));
+
+    // read data from table to ensure it was written correctly
+    HCatSourceTarget source = (HCatSourceTarget) FromHCat.table("default", 
tableName, "timestamp='1234'");
+    PCollection<HCatRecord> read = pipeline.read(source);
+    HCatSchema schema = source.getTableSchema(pipeline.getConfiguration());
+    PGroupedTable<String, DefaultHCatRecord> table = read.parallelDo(new 
HCatTestUtils.Fns.GroupByHCatRecordFn(),
+        Writables.tableOf(Writables.strings(), 
Writables.writables(DefaultHCatRecord.class))).groupByKey();
+
+    Iterable<Pair<Integer, String>> mat = table
+        .parallelDo(new HCatTestUtils.Fns.IterableToHCatRecordMapFn(), 
Writables.writables(HCatRecord.class))
+        .parallelDo(new HCatTestUtils.Fns.MapPairFn(schema), 
Avros.tableOf(Avros.ints(), Avros.strings()))
+        .materialize();
+
+
+    assertEquals(ImmutableList.of(Pair.of(29, "indiana"), Pair.of(17, 
"josh")), ImmutableList.copyOf(mat));
+    pipeline.done();
+  }
+
+  @Test
+  public void test_HCatTarget_WriteToNonNativeTable_HBase() throws Exception {
+    HBaseTestingUtility hbaseTestUtil = null;
+    try {
+      String db = "default";
+      String sourceHiveTable = "source_table";
+      String destinationHiveTable = "dest_table";
+      Configuration configuration = HBaseConfiguration.create(conf);
+      hbaseTestUtil = new HBaseTestingUtility(configuration);
+      hbaseTestUtil.startMiniZKCluster();
+      hbaseTestUtil.startMiniHBaseCluster(1, 1);
+
+      org.apache.hadoop.hbase.client.Table sourceTable = 
hbaseTestUtil.createTable(TableName.valueOf(sourceHiveTable),
+          "fam");
+
+      String key1 = "this-is-a-key";
+      Put put = new Put(Bytes.toBytes(key1));
+      put.addColumn("fam".getBytes(), "foo".getBytes(), "17".getBytes());
+      sourceTable.put(put);
+      String key2 = "this-is-a-key-too";
+      Put put2 = new Put(Bytes.toBytes(key2));
+      put2.addColumn("fam".getBytes(), "foo".getBytes(), "29".getBytes());
+      sourceTable.put(put2);
+      sourceTable.close();
+
+      // create Hive Table for source table
+      org.apache.hadoop.hive.ql.metadata.Table tbl = new 
org.apache.hadoop.hive.ql.metadata.Table(db, sourceHiveTable);
+      tbl.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+      tbl.setTableType(TableType.EXTERNAL_TABLE);
+
+      FieldSchema f1 = new FieldSchema();
+      f1.setName("foo");
+      f1.setType("int");
+      FieldSchema f2 = new FieldSchema();
+      f2.setName("key");
+      f2.setType("string");
+
+      tbl.setProperty("storage_handler", 
"org.apache.hadoop.hive.hbase.HBaseStorageHandler");
+      
tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+      tbl.setFields(ImmutableList.of(f1, f2));
+      tbl.setSerdeParam("hbase.columns.mapping", "fam:foo,:key");
+      this.client.createTable(tbl.getTTable());
+
+      // creates destination table
+      hbaseTestUtil.createTable(TableName.valueOf(destinationHiveTable), 
"fam");
+      org.apache.hadoop.hive.ql.metadata.Table destTable = new 
org.apache.hadoop.hive.ql.metadata.Table(db,
+          destinationHiveTable);
+      
destTable.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+      destTable.setTableType(TableType.EXTERNAL_TABLE);
+
+      destTable.setProperty("storage_handler", 
"org.apache.hadoop.hive.hbase.HBaseStorageHandler");
+      
destTable.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+      destTable.setFields(ImmutableList.of(f1, f2));
+      destTable.setSerdeParam("hbase.columns.mapping", "fam:foo,:key");
+      this.client.createTable(destTable.getTTable());
+
+      Pipeline p = new MRPipeline(HCatSourceITSpec.class, configuration);
+      PCollection<HCatRecord> records = 
p.read(FromHCat.table(sourceHiveTable));
+      p.write(records, ToHCat.table(destinationHiveTable));
+      p.done();
+
+      Connection connection = null;
+      try {
+        Scan scan = new Scan();
+
+        connection = ConnectionFactory.createConnection(configuration);
+        org.apache.hadoop.hbase.client.Table table = 
connection.getTable(TableName.valueOf(destinationHiveTable));
+        ResultScanner scanner = table.getScanner(scan);
+
+        Result result = null;
+        List<Pair<String, Integer>> actual = new ArrayList<>();
+        while (((result = scanner.next()) != null)) {
+          String value = Bytes.toString(result.getValue("fam".getBytes(), 
"foo".getBytes()));
+          actual.add(Pair.of(Bytes.toString(result.getRow()), 
Integer.parseInt(value)));
+        }
+
+        Assert.assertEquals(ImmutableList.of(Pair.of(key1, 17), Pair.of(key2, 
29)), actual);
+      } finally {
+        IOUtils.closeQuietly(connection);
+      }
+    } finally {
+      if (hbaseTestUtil != null) {
+        hbaseTestUtil.shutdownMiniHBaseCluster();
+        hbaseTestUtil.shutdownMiniZKCluster();
+      }
+    }
+  }
+
+  // writes data to the specified location and ensures the directory exists
+  // prior to writing
+  private Path writeDataToHdfs(String data, Path location, Configuration conf) 
throws IOException {
+    FileSystem fs = location.getFileSystem(conf);
+    Path writeLocation = new Path(location, UUID.randomUUID().toString());
+    fs.mkdirs(location);
+    fs.create(writeLocation);
+    ByteArrayInputStream baos = new 
ByteArrayInputStream(data.getBytes("UTF-8"));
+    try (FSDataOutputStream fos = fs.create(writeLocation)) {
+      IOUtils.copy(baos, fos);
+    }
+
+    return writeLocation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java
 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java
new file mode 100644
index 0000000..b2f41d9
--- /dev/null
+++ 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Test suite to re-use the same hive metastore instance for all tests in the
+ * suite
+ */
+@RunWith(Suite.class)
[email protected]({ HCatSourceITSpec.class, HCatTargetITSpec.class })
+public class HCatTestSuiteIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HCatTestSuiteIT.class);
+  private static boolean runAsSuite = false;
+
+  public static TemporaryPath hadoopTempDir = new 
TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir");
+
+  static HiveConf hconf;
+  static IMetaStoreClient client;
+  static Configuration conf = null;
+
+  @BeforeClass
+  public static void startSuite() throws Exception {
+    runAsSuite = true;
+    setupFileSystem();
+    setupMetaStore();
+  }
+
+  @AfterClass
+  public static void endSuite() throws Exception {
+    cleanup();
+  }
+
+  public static Configuration getConf() {
+    return conf;
+  }
+
+  public static TemporaryPath getRootPath() {
+    return hadoopTempDir;
+  }
+
+  public static IMetaStoreClient getClient() {
+    return client;
+  }
+
+  private static void setupMetaStore() throws Exception {
+    conf = hadoopTempDir.getDefaultConfiguration();
+    // set the warehouse location to the location of the temp dir, so managed
+    // tables return a size estimate of the table
+    String databaseLocation = hadoopTempDir.getPath("metastore_db").toString();
+    String derbyLocation = hadoopTempDir.getPath("derby.log").toString();
+    String jdbcUrl = "jdbc:derby:;databaseName=" + databaseLocation + 
";create=true";
+    conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, jdbcUrl);
+    conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), 
hadoopTempDir.getRootPath().toString());
+    // allow HMS to create any tables necessary
+    conf.set("datanucleus.schema.autoCreateTables", "true");
+    // disable verification as the tables won't exist at startup
+    conf.set("hive.metastore.schema.verification", "false");
+    // write derby logs to the temp directory to be cleaned up automagically 
after the test runs
+    System.setProperty("derby.stream.error.file", derbyLocation);
+    hconf = HCatUtil.getHiveConf(conf);
+    client = HCatUtil.getHiveMetastoreClient(hconf);
+  }
+
+  private static void setupFileSystem() throws Exception {
+    try {
+      hadoopTempDir.create();
+    } catch (Throwable throwable) {
+      throw (Exception) throwable;
+    }
+  }
+
+  public static void startTest() throws Exception {
+    if (!runAsSuite) {
+      setupFileSystem();
+      setupMetaStore();
+    }
+  }
+
+  public static void endTest() throws Exception {
+    if (!runAsSuite) {
+      cleanup();
+    }
+  }
+
+  private static void cleanup() throws IOException {
+    hadoopTempDir.delete();
+    client.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java
new file mode 100644
index 0000000..0b047ea
--- /dev/null
+++ 
b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+
+public class HCatTestUtils {
+
+  public static class Fns {
+
+    /**
+     * Maps an HCatRecord with a Key to a pair of the Key and the value of the
+     * column "foo"
+     */
+    public static class KeyMapPairFn extends MapFn<HCatRecord, Pair<String, 
Integer>> {
+
+      private HCatSchema schema;
+
+      public KeyMapPairFn(HCatSchema schema) {
+        this.schema = schema;
+      }
+
+      @Override
+      public Pair<String, Integer> map(HCatRecord input) {
+        try {
+          return Pair.of(input.getString("key", schema), 
input.getInteger("foo", schema));
+        } catch (HCatException e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+    }
+
+    /**
+     * Takes an HCatRecord and emits a Pair<Integer, String>. assumes the
+     * columns in the record are "foo" (int) and "bar" (string)
+     */
+    public static class MapPairFn extends MapFn<HCatRecord, Pair<Integer, 
String>> {
+
+      private HCatSchema schema;
+
+      public MapPairFn(HCatSchema schema) {
+        this.schema = schema;
+      }
+
+      @Override
+      public Pair<Integer, String> map(HCatRecord input) {
+        try {
+          return Pair.of(input.getInteger("foo", schema), 
input.getString("bar", schema));
+        } catch (HCatException e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+    }
+
+    /**
+     * Simple MapFn that emits the input record and emits a Pair, with the 
first
+     * element being "record". Useful for when testing group by with the value
+     * being HCatRecord
+     */
+    public static class GroupByHCatRecordFn extends MapFn<HCatRecord, 
Pair<String, DefaultHCatRecord>> {
+
+      @Override
+      public Pair<String, DefaultHCatRecord> map(HCatRecord input) {
+        return Pair.of("record", (DefaultHCatRecord) input);
+      }
+    }
+
+    /**
+     * Takes the input iterable of DefaultHCatRecords and emits Pairs that
+     * contain the value of the columns "foo" and "bar"
+     */
+    public static class HCatRecordMapFn extends DoFn<Pair<String, 
Iterable<DefaultHCatRecord>>, Pair<Integer, String>> {
+
+      private HCatSchema schema;
+
+      public HCatRecordMapFn(HCatSchema schema) {
+        this.schema = schema;
+      }
+
+      @Override
+      public void process(Pair<String, Iterable<DefaultHCatRecord>> input, 
Emitter<Pair<Integer, String>> emitter) {
+        for (final HCatRecord record : input.second()) {
+          try {
+            emitter.emit(Pair.of(record.getInteger("foo", schema), 
record.getString("bar", schema)));
+          } catch (HCatException e) {
+            throw new CrunchRuntimeException(e);
+          }
+        }
+      }
+    }
+
+    /**
+     * Takes a CSV line and maps it into an HCatRecord
+     */
+    public static class MapHCatRecordFn extends MapFn<String, HCatRecord> {
+
+      static HCatSchema dataSchema;
+
+      @Override
+      public void initialize() {
+        try {
+          dataSchema = HCatOutputFormat.getTableSchema(getConfiguration());
+        } catch (IOException e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+
+      @Override
+      public HCatRecord map(String input) {
+        try {
+          return getHCatRecord(input.split(","));
+        } catch (HCatException e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+
+      private static HCatRecord getHCatRecord(String[] csvParts) throws 
HCatException {
+        // must be set, or all subsequent sets on HCatRecord will fail. setting
+        // the size
+        // initializes the initial backing array
+        DefaultHCatRecord hcatRecord = new 
DefaultHCatRecord(dataSchema.size());
+
+        hcatRecord.set("foo", dataSchema, Integer.parseInt(csvParts[0]));
+        hcatRecord.set("bar", dataSchema, csvParts[1]);
+
+        return hcatRecord;
+      }
+    }
+
+    /**
+     * Takes an iterable of HCatRecords and emits each HCatRecord (turns a
+     * PTable into a PCollection)
+     */
+    public static class IterableToHCatRecordMapFn extends DoFn<Pair<String, 
Iterable<DefaultHCatRecord>>, HCatRecord> {
+
+      @Override
+      public void process(Pair<String, Iterable<DefaultHCatRecord>> input, 
Emitter<HCatRecord> emitter) {
+        for (final HCatRecord record : input.second()) {
+          emitter.emit(record);
+        }
+      }
+    }
+  }
+
+  public static Table createUnpartitionedTable(IMetaStoreClient client, String 
tableName, TableType type)
+      throws IOException, HiveException, TException {
+    return createTable(client, "default", tableName, type, null, 
Collections.<FieldSchema> emptyList());
+  }
+
+  public static Table createUnpartitionedTable(IMetaStoreClient client, String 
tableName, TableType type,
+      @Nullable Path datalocation) throws IOException, HiveException, 
TException {
+    return createTable(client, "default", tableName, type, datalocation, 
Collections.<FieldSchema> emptyList());
+  }
+
+  public static Table createTable(IMetaStoreClient client, String db, String 
tableName, TableType type,
+      @Nullable Path datalocation, List<FieldSchema> partCols) throws 
IOException, HiveException, TException {
+    org.apache.hadoop.hive.ql.metadata.Table tbl = new 
org.apache.hadoop.hive.ql.metadata.Table(db, tableName);
+    tbl.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+    tbl.setTableType(type);
+
+    if (datalocation != null)
+      tbl.setDataLocation(datalocation);
+
+    FieldSchema f1 = new FieldSchema();
+    f1.setName("foo");
+    f1.setType("int");
+    FieldSchema f2 = new FieldSchema();
+    f2.setName("bar");
+    f2.setType("string");
+
+    if (partCols != null && !partCols.isEmpty())
+      tbl.setPartCols(partCols);
+
+    tbl.setFields(ImmutableList.of(f1, f2));
+    
tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+    tbl.setSerdeParam("field.delim", ",");
+    tbl.setSerdeParam("serialization.format", ",");
+    tbl.setInputFormatClass("org.apache.hadoop.mapred.TextInputFormat");
+    
tbl.setOutputFormatClass("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
+    client.createTable(tbl.getTTable());
+
+    return client.getTable(db, tableName);
+  }
+
+  public static Partition createPartition(Table table, Path partLocation, 
List<String> partValues) {
+    Partition partition = new Partition();
+    partition.setDbName(table.getDbName());
+    partition.setTableName(table.getTableName());
+    partition.setSd(new StorageDescriptor(table.getSd()));
+    partition.setValues(partValues);
+    partition.getSd().setLocation(partLocation.toString());
+    return partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-hcatalog/src/it/resources/log4j.properties 
b/crunch-hcatalog/src/it/resources/log4j.properties
new file mode 100644
index 0000000..bb987d6
--- /dev/null
+++ b/crunch-hcatalog/src/it/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+        
+# Log warnings on Hadoop for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+# Except for Configuration, which is chatty.
+log4j.logger.org.apache.hadoop.conf.Configuration=error, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+log4j.appender.A.org.apache.derby=debug
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java
new file mode 100644
index 0000000..3988f00
--- /dev/null
+++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import org.apache.crunch.Source;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Static factory methods for creating sources to read from HCatalog.
+ *
+ * Access examples:
+ * <pre>
+ * {@code
+ *
+ *  Pipeline pipeline = new MRPipeline(this.getClass());
+ *
+ *  PCollection<HCatRecord> hcatRecords = 
pipeline.read(FromHCat.table("my-table"))
+ * }
+ * </pre>
+ */
+public final class FromHCat {
+
+  private FromHCat() {
+  }
+
+  /**
+   * Creates a {@code Source<HCatRecord>} instance from a hive table in the
+   * default database instance "default".
+   *
+   * @param table
+   *          table name
+   * @throw IllegalArgumentException if table is null or empty
+   */
+  public static Source<HCatRecord> table(String table) {
+    return new HCatSourceTarget(table);
+  }
+
+  /**
+   * Creates a {code Source<HCatRecord>} instance from a hive table.
+   *
+   * @param database
+   *          database name
+   * @param table
+   *          table name
+   * @throw IllegalArgumentException if table is null or empty
+   */
+  public static Source<HCatRecord> table(String database, String table) {
+    return new HCatSourceTarget(database, table);
+  }
+
+  /**
+   * Creates a {code Source<HCatRecord>} instance from a hive table with custom
+   * filter criteria. If {@code database} is null, uses the default
+   * database instance "default"
+   *
+   * @param database
+   *          database name
+   * @param table
+   *          table name
+   * @param filter
+   *          a custom filter criteria, e.g. specify partitions by
+   *          {@code 'date= "20140424"'} or {@code 'date < "20140424"'}
+   * @throw IllegalArgumentException if table is null or empty
+   */
+  public static Source<HCatRecord> table(@Nullable String database, String 
table, String filter) {
+    return new HCatSourceTarget(database, table, filter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java
 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java
new file mode 100644
index 0000000..0f1d360
--- /dev/null
+++ 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class HCatRecordDataIterable implements Iterable<HCatRecord> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HCatRecordDataIterable.class);
+
+  private final FormatBundle<HCatInputFormat> bundle;
+  private final Configuration conf;
+
+  public HCatRecordDataIterable(FormatBundle<HCatInputFormat> bundle, 
Configuration configuration) {
+    this.bundle = bundle;
+    this.conf = configuration;
+  }
+
+  @Override
+  public Iterator<HCatRecord> iterator() {
+    try {
+      Job job = Job.getInstance(bundle.configure(conf));
+
+      final InputFormat fmt = 
ReflectionUtils.newInstance(bundle.getFormatClass(), conf);
+      final TaskAttemptContext ctxt = new TaskAttemptContextImpl(conf, new 
TaskAttemptID());
+
+      return Iterators.concat(Lists.transform(fmt.getSplits(job), new 
Function<InputSplit, Iterator<HCatRecord>>() {
+
+        @Override
+        public Iterator<HCatRecord> apply(InputSplit split) {
+          RecordReader reader = null;
+          try {
+            reader = fmt.createRecordReader(split, ctxt);
+            reader.initialize(split, ctxt);
+          } catch (IOException | InterruptedException e) {
+            throw new CrunchRuntimeException(e);
+          }
+          return new HCatRecordReaderIterator(reader);
+        }
+      }).iterator());
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  private static class HCatRecordReaderIterator<T> implements Iterator<T> {
+
+    private final RecordReader<WritableComparable, T> reader;
+    private boolean hasNext;
+    private T current;
+
+    public HCatRecordReaderIterator(RecordReader reader) {
+      this.reader = reader;
+      try {
+        hasNext = reader.nextKeyValue();
+        if (hasNext)
+          current = this.reader.getCurrentValue();
+      } catch (IOException | InterruptedException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      T ret = current;
+      try {
+        hasNext = reader.nextKeyValue();
+
+        if (hasNext) {
+          current = reader.getCurrentValue();
+        }
+      } catch (IOException | InterruptedException e) {
+        throw new CrunchRuntimeException(e);
+      }
+      return ret;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Removing elements is not 
supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java
 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java
new file mode 100644
index 0000000..e4d4225
--- /dev/null
+++ 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+public class HCatRecordDataReadable implements ReadableData<HCatRecord> {
+
+  private final FormatBundle<HCatInputFormat> bundle;
+  private final String database;
+  private final String table;
+  private final String filter;
+
+  public HCatRecordDataReadable(FormatBundle<HCatInputFormat> bundle, String 
database, String table, String filter) {
+    this.bundle = bundle;
+    this.database = database;
+    this.table = table;
+    this.filter = filter;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    // need to configure the input format, so the JobInputInfo is populated 
with
+    // the partitions to be processed. the partitions are needed to derive the
+    // input splits and to get a size estimate for the HCatSource.
+    HCatSourceTarget.configureHCatFormat(conf, bundle, database, table, 
filter);
+  }
+
+  @Override
+  public Iterable<HCatRecord> read(TaskInputOutputContext<?, ?, ?, ?> context) 
throws IOException {
+    return new HCatRecordDataIterable(bundle, context.getConfiguration());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java
 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java
new file mode 100644
index 0000000..39a66cf
--- /dev/null
+++ 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.PartInfo;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+public class HCatSourceTarget extends HCatTarget implements 
ReadableSourceTarget<HCatRecord> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HCatSourceTarget.class);
+  private static final PType<HCatRecord> PTYPE = 
Writables.writables(HCatRecord.class);
+  private Configuration hcatConf;
+
+  private final FormatBundle<HCatInputFormat> bundle = 
FormatBundle.forInput(HCatInputFormat.class);
+  private final String database;
+  private final String table;
+  private final String filter;
+  private Table hiveTableCached;
+
+  // Default guess at the size of the data to materialize
+  private static final long DEFAULT_ESTIMATE = 1024 * 1024 * 1024;
+
+  /**
+   * Creates a new instance to read from the specified {@code table} and the
+   * {@link 
org.apache.hadoop.hive.metastore.MetaStoreUtils#DEFAULT_DATABASE_NAME
+   * default} database
+   *
+   * @param table
+   * @throw IllegalArgumentException if table is null or empty
+   */
+  public HCatSourceTarget(String table) {
+    this(DEFAULT_DATABASE_NAME, table);
+  }
+
+  /**
+   * Creates a new instance to read from the specified {@code database} and
+   * {@code table}
+   *
+   * @param database
+   *          the database to read from
+   * @param table
+   *          the table to read from
+   * @throw IllegalArgumentException if table is null or empty
+   */
+  public HCatSourceTarget(String database, String table) {
+    this(database, table, null);
+  }
+
+  /**
+   * Creates a new instance to read from the specified {@code database} and
+   * {@code table}, restricting partitions by the specified {@code filter}. If
+   * the database isn't specified it will default to the
+   * {@link 
org.apache.hadoop.hive.metastore.MetaStoreUtils#DEFAULT_DATABASE_NAME
+   * default} database.
+   *
+   * @param database
+   *          the database to read from
+   * @param table
+   *          the table to read from
+   * @param filter
+   *          the filter to apply to find partitions
+   * @throw IllegalArgumentException if table is null or empty
+   */
+  public HCatSourceTarget(@Nullable String database, String table, String 
filter) {
+    super(database, table);
+    this.database = Strings.isNullOrEmpty(database) ? DEFAULT_DATABASE_NAME : 
database;
+    Preconditions.checkArgument(!StringUtils.isEmpty(table), "table cannot be 
null or empty");
+    this.table = table;
+    this.filter = filter;
+  }
+
+  @Override
+  public SourceTarget<HCatRecord> conf(String key, String value) {
+    return null;
+  }
+
+  @Override
+  public Source<HCatRecord> inputConf(String key, String value) {
+    bundle.set(key, value);
+    return this;
+  }
+
+  @Override
+  public PType<HCatRecord> getType() {
+    return PTYPE;
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return PTYPE.getConverter();
+  }
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration jobConf = job.getConfiguration();
+
+    if (hcatConf == null) {
+      hcatConf = configureHCatFormat(jobConf, bundle, database, table, filter);
+    }
+
+    if (inputId == -1) {
+      job.setMapperClass(CrunchMapper.class);
+      job.setInputFormatClass(bundle.getFormatClass());
+      bundle.configure(jobConf);
+    } else {
+      Path dummy = new Path("/hcat/" + database + "/" + table);
+      CrunchInputs.addInputPath(job, dummy, bundle, inputId);
+    }
+  }
+
+  static Configuration configureHCatFormat(Configuration conf, 
FormatBundle<HCatInputFormat> bundle, String database,
+      String table, String filter) {
+    // It is tricky to get the HCatInputFormat configured correctly.
+    //
+    // The first parameter of setInput() is for both input and output.
+    // It reads Hive MetaStore's JDBC URL or HCatalog server's Thrift address,
+    // and saves the schema into the configuration for runtime needs
+    // (e.g. data location).
+    //
+    // Our solution is to create another configuration object, and
+    // compares with the original one to see what has been added.
+    Configuration newConf = new Configuration(conf);
+    try {
+      HCatInputFormat.setInput(newConf, database, table, filter);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+
+    for (Map.Entry<String, String> e : newConf) {
+      String key = e.getKey();
+      String value = e.getValue();
+      if (!Objects.equal(value, conf.get(key))) {
+        bundle.set(key, value);
+      }
+    }
+
+    return newConf;
+  }
+
+  @Override
+  public long getSize(Configuration conf) {
+
+    // this is tricky. we want to derive the size by the partitions being
+    // retrieved. these aren't known until after the HCatInputFormat has
+    // been initialized (see #configureHCatFormat). preferably, the input
+    // format shouldn't be configured twice to cut down on the number of calls
+    // to hive. getSize can be called before configureSource is called when the
+    // collection is being materialized or a groupby has been performed. so, 
the
+    // InputJobInfo, which has the partitions, won't be present when this
+    // happens. so, configure here or in configureSource just once.
+    if (hcatConf == null) {
+      hcatConf = configureHCatFormat(conf, bundle, database, table, filter);
+    }
+    try {
+      InputJobInfo inputJobInfo = (InputJobInfo) 
HCatUtil.deserialize(hcatConf.get(HCatConstants.HCAT_KEY_JOB_INFO));
+      List<PartInfo> partitions = inputJobInfo.getPartitions();
+
+      if (partitions.size() > 0) {
+        LOGGER.debug("Found [{}] partitions to read", partitions.size());
+        long size = 0;
+        for (final PartInfo partition : partitions) {
+          String totalSize = 
partition.getInputStorageHandlerProperties().getProperty(StatsSetupConst.TOTAL_SIZE);
+
+          if (StringUtils.isEmpty(totalSize)) {
+            long pathSize = SourceTargetHelper.getPathSize(conf, new 
Path(partition.getLocation()));
+            if (pathSize == -1) {
+              LOGGER.info("Unable to locate directory [{}]; skipping", 
partition.getLocation());
+              // could be an hbase table, in which there won't be a size
+              // estimate if this is a valid native table partition, but no
+              // data, materialize won't find anything
+            } else if (pathSize == 0) {
+              size += DEFAULT_ESTIMATE;
+            } else {
+              size += pathSize;
+            }
+          } else {
+            size += Long.parseLong(totalSize);
+          }
+        }
+        return size;
+      } else {
+        Table hiveTable = getHiveTable(conf);
+        LOGGER.debug("Attempting to get table size from table properties for 
table [{}]", table);
+
+        // managed table will have the size on it, but should be caught as a
+        // partition.size == 1 if the table isn't partitioned
+        String totalSize = 
hiveTable.getParameters().get(StatsSetupConst.TOTAL_SIZE);
+        if (!StringUtils.isEmpty(totalSize))
+          return Long.parseLong(totalSize);
+
+        // not likely to be hit. the totalSize should have been available on 
the
+        // partitions returned (for unpartitioned tables one partition will be
+        // returned, referring to the entire table), or on the table metadata
+        // (only there for managed tables). if neither existed, then check
+        // against the data location as backup. note: external tables can be
+        // somewhere other than the root location as defined by the table,
+        // as partitions can exist elsewhere. ideally this scenario is caught
+        // by the if statement with partitions > 0
+        LOGGER.debug("Unable to find size on table properties [{}], attempting 
to get it from table data location [{}]",
+            hiveTable.getTableName(), hiveTable.getDataLocation());
+        return SourceTargetHelper.getPathSize(conf, 
hiveTable.getDataLocation());
+      }
+    } catch (IOException | TException e) {
+      LOGGER.info("Unable to determine an estimate for requested table [{}], 
using default", table, e);
+      return DEFAULT_ESTIMATE;
+    }
+  }
+
+  /**
+   * Extracts the {@link HCatSchema} from the specified {@code conf}.
+   *
+   * @param conf
+   *          the conf containing the table schema
+   * @return the HCatSchema
+   *
+   * @throws TException
+   *           if there was an issue communicating with the metastore
+   * @throws IOException
+   *           if there was an issue connecting to the metastore
+   */
+  public HCatSchema getTableSchema(Configuration conf) throws TException, 
IOException {
+    Table hiveTable = getHiveTable(conf);
+    return HCatUtil.extractSchema(hiveTable);
+  }
+
+  @Override
+  public long getLastModifiedAt(Configuration conf) {
+    LOGGER.warn("Unable to determine the last modified time for db [{}] and 
table [{}]", database, table);
+    return -1;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !getClass().equals(o.getClass())) {
+      return false;
+    }
+    HCatSourceTarget that = (HCatSourceTarget) o;
+    return Objects.equal(this.database, that.database) && 
Objects.equal(this.table, that.table)
+        && Objects.equal(this.filter, that.filter);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(table, database, filter);
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this).append("database", 
database).append("table", table).append("filter", filter)
+        .toString();
+  }
+
+  private Table getHiveTable(Configuration conf) throws IOException, 
TException {
+    if (hiveTableCached != null) {
+      return hiveTableCached;
+    }
+
+    IMetaStoreClient hiveMetastoreClient = HCatUtil.getHiveMetastoreClient(new 
HiveConf(conf, HCatSourceTarget.class));
+    hiveTableCached = HCatUtil.getTable(hiveMetastoreClient, database, table);
+    return hiveTableCached;
+  }
+
+  @Override
+  public Iterable<HCatRecord> read(Configuration conf) throws IOException {
+    if (hcatConf == null) {
+      hcatConf = configureHCatFormat(conf, bundle, database, table, filter);
+    }
+
+    return new HCatRecordDataIterable(bundle, hcatConf);
+  }
+
+  @Override
+  public ReadableData<HCatRecord> asReadable() {
+    return new HCatRecordDataReadable(bundle, database, table, filter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java
new file mode 100644
index 0000000..114cf55
--- /dev/null
+++ 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.CrunchHCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+public class HCatTarget implements MapReduceTarget {
+
+  private static final PType<HCatRecord> PTYPE = 
Writables.writables(HCatRecord.class);
+  private static final PType<DefaultHCatRecord> DEFAULT_PTYPE = 
Writables.writables(DefaultHCatRecord.class);
+
+  private final OutputJobInfo info;
+  private final FormatBundle bundle = 
FormatBundle.forOutput(CrunchHCatOutputFormat.class);
+  private Table hiveTableCached;
+
+  /**
+   * Constructs a new instance to write to the provided hive {@code table} 
name.
+   * Writes to the "default" database.
+   * 
+   * Note: if the destination table is partitioned, this constructor should not
+   * be used. It will only be usable by unpartitioned tables
+   *
+   * @param table
+   *          the hive table to write to
+   */
+  public HCatTarget(String table) {
+    this(null, table, null);
+  }
+
+  /**
+   * Constructs a new instance to write to the provided hive {@code table} 
name,
+   * using the provided {@code database}. If null, uses "default" database.
+   *
+   * Note: if the destination table is partitioned, this constructor should not
+   * be used. It will only be usable by unpartitioned tables
+   *
+   * @param database
+   *          the hive database to use for table namespacing
+   * @param table
+   *          the hive table to write to
+   */
+  public HCatTarget(@Nullable String database, String table) {
+    this(database, table, null);
+  }
+
+  /**
+   * Constructs a new instance to write to the provided hive {@code table} name
+   * and {@code partitionValues}. Writes to the "default" database.
+   * 
+   * Note: partitionValues will be assembled into a single directory path.
+   *
+   * For example, if the partition values are:
+   *
+   * <pre>
+   * [year, 2017], 
+   * [month,11], 
+   * [day, 10]
+   *
+   * The constructed directory path will be
+   * "[dataLocationRoot]/year=2017/month=11/day=10"
+   * </pre>
+   *
+   * @param table
+   *          the hive table to write to
+   * @param partitionValues
+   *          the partition within the table it should be written
+   */
+  public HCatTarget(String table, Map<String, String> partitionValues) {
+    this(null, table, partitionValues);
+  }
+
+  /**
+   * Constructs a new instance to write to the provided {@code database},
+   * {@code table}, and to the specified {@code partitionValues}. If
+   * {@code database} isn't specified, the "default" database is used
+   *
+   * Note: partitionValues will be assembled into a single directory path.
+   *
+   * For example, if the partition values are:
+   *
+   * <pre>
+   * [year, 2017], 
+   * [month,11], 
+   * [day, 10]
+   *
+   * The constructed directory path will be
+   * "[dataLocationRoot]/year=2017/month=11/day=10"
+   * </pre>
+   * 
+   * @param database
+   *          the hive database to use for table namespacing
+   * @param table
+   *          the hive table to write to
+   * @param partitionValues
+   *          the partition within the table it should be written
+   */
+  public HCatTarget(@Nullable String database, String table, @Nullable 
Map<String, String> partitionValues) {
+    this.info = OutputJobInfo.create(database, table, partitionValues);
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, 
String name) {
+    if (Strings.isNullOrEmpty(name)) {
+      throw new AssertionError("Named output wasn't generated. This shouldn't 
happen");
+    }
+    CrunchOutputs.addNamedOutput(job, name, bundle, NullWritable.class, 
HCatRecord.class);
+
+    try {
+      CrunchHCatOutputFormat.setOutput(job, info);
+
+      // set the schema into config. this would be necessary if any downstream
+      // tasks need the schema translated between a format (e.g. avro) and
+      // HCatRecord for the destination table
+      Table table = getHiveTable(job.getConfiguration());
+      CrunchHCatOutputFormat.setSchema(job, HCatUtil.extractSchema(table));
+    } catch (TException | IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public Target outputConf(String key, String value) {
+    bundle.set(key, value);
+    return this;
+  }
+
+  @Override
+  public boolean handleExisting(WriteMode writeMode, long lastModifiedAt, 
Configuration conf) {
+    return writeMode == WriteMode.DEFAULT;
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if (!acceptType(ptype)) {
+      return false;
+    }
+
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+    return ptype.getConverter();
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (acceptType(ptype))
+      return (SourceTarget<T>) new HCatSourceTarget(info.getDatabaseName(), 
info.getTableName());
+
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+            .append("database", info.getDatabaseName())
+            .append("table", info.getTableName())
+            .append("partition", info.getPartitionValues())
+            .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(info.getDatabaseName(), info.getTableName(), 
info.getPartitionValues());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !getClass().equals(o.getClass())) {
+      return false;
+    }
+
+    HCatTarget that = (HCatTarget) o;
+    return Objects.equal(this.info.getDatabaseName(), 
that.info.getDatabaseName())
+        && Objects.equal(this.info.getTableName(), that.info.getTableName())
+        && Objects.equal(this.info.getPartitionValues(), 
that.info.getPartitionValues());
+  }
+
+  private boolean acceptType(PType<?> ptype) {
+    return Objects.equal(ptype, PTYPE) || Objects.equal(ptype, DEFAULT_PTYPE);
+  }
+
+  private Table getHiveTable(Configuration conf) throws IOException, 
TException {
+    if (hiveTableCached != null) {
+      return hiveTableCached;
+    }
+
+    IMetaStoreClient hiveMetastoreClient = HCatUtil.getHiveMetastoreClient(new 
HiveConf(conf, HCatTarget.class));
+    hiveTableCached = HCatUtil.getTable(hiveMetastoreClient, 
info.getDatabaseName(), info.getTableName());
+    return hiveTableCached;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java
----------------------------------------------------------------------
diff --git 
a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java 
b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java
new file mode 100644
index 0000000..76add9e
--- /dev/null
+++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hcatalog;
+
+import org.apache.crunch.Target;
+
+import java.util.Map;
+
+/**
+ * Factory static helper methods for writing data to HCatalog
+ *
+ * <pre>
+ * {@code
+ *  Pipeline pipeline = new MRPipeline(this.getClass());
+ *
+ *  PCollection<HCatRecord> hcatRecords = 
pipeline.read(FromHCat.table("this-table");
+ *
+ *  pipeline.write(hcatRecords, ToHCat.table("that-table"));
+ * }
+ * </pre>
+ */
+public class ToHCat {
+
+  /**
+   * Constructs a new instance to write to the provided hive {@code table} 
name.
+   * Writes to the "default" database.
+   *
+   * Note: if the destination table is partitioned, this constructor should not
+   * be used. It will only be usable by unpartitioned tables
+   *
+   * @param tableName
+   *          the hive table to write to
+   */
+  public static Target table(String tableName) {
+    return new HCatTarget(tableName);
+  }
+
+  /**
+   * Constructs a new instance to write to the provided hive {@code tableName},
+   * using the provided {@code database}. If null, uses "default" database.
+   *
+   * Note: if the destination table is partitioned, this constructor should not
+   * be used. It will only be usable by unpartitioned tables
+   *
+   * @param database
+   *          the hive database to use for table namespacing
+   * @param tableName
+   *          the hive table to write to
+   */
+  public static Target table(String database, String tableName) {
+    return new HCatTarget(database, tableName);
+  }
+
+  /**
+   * Constructs a new instance to write to the provided hive {@code table} name
+   * and {@code partitionValues}. Writes to the "default" database.
+   *
+   * Note: partitionValues will be assembled into a single directory path.
+   *
+   * For example, if the partition values are:
+   * 
+   * <pre>
+   * [year, 2017], 
+   * [month,11], 
+   * [day, 10]
+   *
+   * The constructed directory path will be
+   * "[dataLocationRoot]/year=2017/month=11/day=10"
+   * </pre>
+   *
+   * @param tableName
+   *          the hive table to write to
+   * @param partition
+   *          the partition within the table it should be written
+   */
+  public static Target table(String tableName, Map<String, String> partition) {
+    return new HCatTarget(tableName, partition);
+  }
+
+  /**
+   * Constructs a new instance to write to the provided {@code database},
+   * {@code tableName}, and to the specified {@code partition}. If
+   * {@code database} isn't specified, the "default" database is used
+   *
+   * Note: partitionValues will be assembled into a single directory path.
+   *
+   * For example, if the partition values are:
+   * 
+   * <pre>
+   * [year, 2017], 
+   * [month,11], 
+   * [day, 10]
+   *
+   * The constructed directory path will be
+   * "[dataLocationRoot]/year=2017/month=11/day=10"
+   * </pre>
+   * 
+   * @param database
+   *          the hive database to use for table namespacing
+   * @param tableName
+   *          the hive table to write to
+   * @param partition
+   *          the partition within the table it should be written
+   */
+  public static Target table(String database, String tableName, Map<String, 
String> partition) {
+    return new HCatTarget(database, tableName, partition);
+  }
+}

Reply via email to