CRUNCH-340: added HCatSource & HCatTarget Signed-off-by: Josh Wills <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5609b014 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5609b014 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5609b014 Branch: refs/heads/master Commit: 5609b014378d3460a55ce25522f0c00659872807 Parents: 2469348 Author: Stephen Durfey <[email protected]> Authored: Mon Dec 4 10:49:59 2017 -0600 Committer: Josh Wills <[email protected]> Committed: Sun Dec 10 08:27:13 2017 -0800 ---------------------------------------------------------------------- crunch-hcatalog/pom.xml | 153 +++++++++ .../crunch/io/hcatalog/HCatSourceITSpec.java | 322 ++++++++++++++++++ .../crunch/io/hcatalog/HCatTargetITSpec.java | 308 +++++++++++++++++ .../crunch/io/hcatalog/HCatTestSuiteIT.java | 120 +++++++ .../crunch/io/hcatalog/HCatTestUtils.java | 234 +++++++++++++ .../src/it/resources/log4j.properties | 30 ++ .../org/apache/crunch/io/hcatalog/FromHCat.java | 85 +++++ .../io/hcatalog/HCatRecordDataIterable.java | 124 +++++++ .../io/hcatalog/HCatRecordDataReadable.java | 66 ++++ .../crunch/io/hcatalog/HCatSourceTarget.java | 330 +++++++++++++++++++ .../apache/crunch/io/hcatalog/HCatTarget.java | 240 ++++++++++++++ .../org/apache/crunch/io/hcatalog/ToHCat.java | 123 +++++++ .../CrunchDefaultOutputCommitterContainer.java | 63 ++++ .../CrunchDefaultOutputFormatContainer.java | 39 +++ .../CrunchFileOutputCommitterContainer.java | 92 ++++++ .../CrunchFileOutputFormatContainer.java | 52 +++ .../mapreduce/CrunchHCatOutputFormat.java | 52 +++ .../hcatalog/mapreduce/HCatMapRedUtils.java | 62 ++++ .../hive/hcatalog/mapreduce/package-info.java | 50 +++ pom.xml | 16 +- 20 files changed, 2560 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/pom.xml b/crunch-hcatalog/pom.xml new file mode 100644 index 0000000..e99814b --- /dev/null +++ b/crunch-hcatalog/pom.xml @@ -0,0 +1,153 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-parent</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + + <artifactId>crunch-hcatalog</artifactId> + <name>Apache Crunch HCatalog Support</name> + + <dependencies> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <!-- Used by Hive in integration tests --> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <type>jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-protocol</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-shell</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-hbase</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-hbase-handler</artifactId> + <version>${hive.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java new file mode 100644 index 0000000..676b1fe --- /dev/null +++ b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatSourceITSpec.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.ReadableData; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +public class HCatSourceITSpec extends CrunchTestSupport { + + private static IMetaStoreClient client; + private static TemporaryPath temporaryPath; + private static Configuration conf; + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void setUp() throws Throwable { + HCatTestSuiteIT.startTest(); + client = HCatTestSuiteIT.getClient(); + temporaryPath = HCatTestSuiteIT.getRootPath(); + conf = HCatTestSuiteIT.getConf(); + } + + @AfterClass + public static void tearDown() throws Exception { + HCatTestSuiteIT.endTest(); + } + + @Test + public void testBasic() throws Exception { + String tableName = testName.getMethodName(); + Path tableRootLocation = temporaryPath.getPath(tableName); + String data = "17,josh\n29,indiana\n"; + writeDataToHdfs(data, tableRootLocation, conf); + HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE); + + Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf); + HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName); + HCatSchema schema = src.getTableSchema(p.getConfiguration()); + PCollection<HCatRecord> records = p.read(src); + List<Pair<Integer, String>> mat = Lists.newArrayList( + records.parallelDo(new HCatTestUtils.Fns.MapPairFn(schema), Avros.tableOf(Avros.ints(), Avros.strings())) + .materialize()); + p.done(); + assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), mat); + } + + @Test + public void testReadable() throws Exception { + String tableName = testName.getMethodName(); + Path tableRootLocation = temporaryPath.getPath(tableName); + String data = "17,josh\n29,indiana\n"; + writeDataToHdfs(data, tableRootLocation, conf); + HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE, tableRootLocation); + + Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf); + HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName); + HCatSchema schema = src.getTableSchema(p.getConfiguration()); + PCollection<HCatRecord> records = p.read(src); + + ReadableData<HCatRecord> readable = records.asReadable(true); + TaskInputOutputContext mockTIOC = Mockito.mock(TaskInputOutputContext.class); + when(mockTIOC.getConfiguration()).thenReturn(conf); + readable.configure(conf); + + Iterator<HCatRecord> iterator = readable.read(mockTIOC).iterator(); + HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema); + List<Pair<Integer, String>> results = new ArrayList<>(); + while (iterator.hasNext()) { + results.add(fn.map(iterator.next())); + } + + p.done(); + assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), results); + } + + @Test + public void testmaterialize() throws Exception { + String tableName = testName.getMethodName(); + Path tableRootLocation = temporaryPath.getPath(tableName); + String data = "17,josh\n29,indiana\n"; + writeDataToHdfs(data, tableRootLocation, conf); + HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE, tableRootLocation); + + Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf); + HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName); + HCatSchema schema = src.getTableSchema(p.getConfiguration()); + PCollection<HCatRecord> records = p.read(src); + + // force the materialize here on the HCatRecords themselves ... then + // transform + Iterable<HCatRecord> materialize = records.materialize(); + HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema); + List<Pair<Integer, String>> results = new ArrayList<>(); + for (final HCatRecord record : materialize) { + results.add(fn.map(record)); + } + + p.done(); + assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), results); + } + + @Test + public void testMaterialize_partitionedTable_multiplePartitionsRequested() throws Exception { + String tableName = testName.getMethodName(); + Path tableRootLocation = temporaryPath.getPath(tableName); + String part1Data = "17,josh\n29,indiana\n"; + String part1Value = "1234"; + Path partition1Location = new Path(tableRootLocation, part1Value); + String part2Data = "42,jackie\n17,ohio\n"; + String part2Value = "5678"; + Path partition2Location = new Path(tableRootLocation, part2Value); + writeDataToHdfs(part1Data, partition1Location, conf); + writeDataToHdfs(part2Data, partition2Location, conf); + + FieldSchema partitionSchema = new FieldSchema(); + partitionSchema.setName("timestamp"); + partitionSchema.setType("string"); + + Table table = HCatTestUtils.createTable(client, "default", tableName, TableType.EXTERNAL_TABLE, tableRootLocation, + Collections.singletonList(partitionSchema)); + client + .add_partition(HCatTestUtils.createPartition(table, partition1Location, Collections.singletonList(part1Value))); + client + .add_partition(HCatTestUtils.createPartition(table, partition2Location, Collections.singletonList(part2Value))); + + Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf); + String filter = "timestamp=\"" + part1Value + "\" or timestamp=\"" + part2Value + "\""; + // HCatSource src = new HCatSource("default", tableName, filter); + HCatSourceTarget src = (HCatSourceTarget) FromHCat.table("default", tableName, filter); + + HCatSchema schema = src.getTableSchema(p.getConfiguration()); + PCollection<HCatRecord> records = p.read(src); + // force the materialize here on the HCatRecords themselves ... then + // transform + Iterable<HCatRecord> materialize = records.materialize(); + HCatTestUtils.Fns.MapPairFn fn = new HCatTestUtils.Fns.MapPairFn(schema); + List<Pair<Integer, String>> results = new ArrayList<>(); + for (final HCatRecord record : materialize) { + results.add(fn.map(record)); + } + + p.done(); + assertEquals( + ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana"), Pair.of(42, "jackie"), Pair.of(17, "ohio")), + results); + } + + @Test + public void testGroupBy() throws Exception { + String tableName = testName.getMethodName(); + Path tableRootLocation = temporaryPath.getPath(tableName); + String data = "17,josh\n29,indiana\n"; + writeDataToHdfs(data, tableRootLocation, conf); + HCatTestUtils.createUnpartitionedTable(client, tableName, TableType.MANAGED_TABLE, tableRootLocation); + + Pipeline p = new MRPipeline(HCatSourceITSpec.class, conf); + HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(tableName); + + HCatSchema schema = src.getTableSchema(p.getConfiguration()); + PCollection<HCatRecord> records = p.read(src); + // can't use HCatRecord here as the intermediate output is written out by + // hadoop, and there is + // an explicit check to ensure that the type being written out matches the + // defined output type. + // e.g. DefaultHCatRecord != HCatRecord, therefore an exception is thrown + PTable<String, DefaultHCatRecord> table = records.parallelDo(new HCatTestUtils.Fns.GroupByHCatRecordFn(), + Writables.tableOf(Writables.strings(), Writables.writables(DefaultHCatRecord.class))); + + PTable<Integer, String> finaltable = table.groupByKey().parallelDo(new HCatTestUtils.Fns.HCatRecordMapFn(schema), + Avros.tableOf(Avros.ints(), Avros.strings())); + + List<Pair<Integer, String>> results = new ArrayList<>(); + for (final Map.Entry<Integer, String> entry : finaltable.materializeToMap().entrySet()) { + results.add(Pair.of(entry.getKey(), entry.getValue())); + } + + p.done(); + + assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), results); + } + + @Test + public void test_HCatRead_NonNativeTable_HBase() throws Exception { + HBaseTestingUtility hbaseTestUtil = null; + try { + String db = "default"; + String hiveTable = "test"; + Configuration hbaseConf = HBaseConfiguration.create(conf); + hbaseTestUtil = new HBaseTestingUtility(hbaseConf); + hbaseTestUtil.startMiniZKCluster(); + hbaseTestUtil.startMiniHBaseCluster(1, 1); + + org.apache.hadoop.hbase.client.Table table = hbaseTestUtil.createTable(TableName.valueOf("test-table"), "fam"); + + String key1 = "this-is-a-key"; + Put put = new Put(Bytes.toBytes(key1)); + put.addColumn("fam".getBytes(), "foo".getBytes(), "17".getBytes()); + table.put(put); + String key2 = "this-is-a-key-too"; + Put put2 = new Put(Bytes.toBytes(key2)); + put2.addColumn("fam".getBytes(), "foo".getBytes(), "29".getBytes()); + table.put(put2); + table.close(); + + org.apache.hadoop.hive.ql.metadata.Table tbl = new org.apache.hadoop.hive.ql.metadata.Table(db, hiveTable); + tbl.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + tbl.setTableType(TableType.EXTERNAL_TABLE); + + FieldSchema f1 = new FieldSchema(); + f1.setName("foo"); + f1.setType("int"); + FieldSchema f2 = new FieldSchema(); + f2.setName("key"); + f2.setType("string"); + + tbl.setProperty("hbase.table.name", "test-table"); + tbl.setProperty("hbase.mapred.output.outputtable", "test-table"); + tbl.setProperty("storage_handler", "org.apache.hadoop.hive.hbase.HBaseStorageHandler"); + tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); + tbl.setFields(ImmutableList.of(f1, f2)); + tbl.setSerdeParam("hbase.columns.mapping", "fam:foo,:key"); + this.client.createTable(tbl.getTTable()); + + Pipeline p = new MRPipeline(HCatSourceITSpec.class, hbaseConf); + HCatSourceTarget src = (HCatSourceTarget) FromHCat.table(hiveTable); + + HCatSchema schema = src.getTableSchema(p.getConfiguration()); + PCollection<HCatRecord> records = p.read(src); + List<Pair<String, Integer>> mat = Lists.newArrayList( + records.parallelDo(new HCatTestUtils.Fns.KeyMapPairFn(schema), Avros.tableOf(Avros.strings(), Avros.ints())) + .materialize()); + + p.done(); + + assertEquals(ImmutableList.of(Pair.of(key1, 17), Pair.of(key2, 29)), mat); + } finally { + if (hbaseTestUtil != null) { + hbaseTestUtil.shutdownMiniHBaseCluster(); + hbaseTestUtil.shutdownMiniZKCluster(); + } + } + } + + // writes data to the specified location and ensures the directory exists + // prior to writing + private Path writeDataToHdfs(String data, Path location, Configuration conf) throws IOException { + FileSystem fs = location.getFileSystem(conf); + Path writeLocation = new Path(location, UUID.randomUUID().toString()); + fs.mkdirs(location); + fs.create(writeLocation); + ByteArrayInputStream baos = new ByteArrayInputStream(data.getBytes("UTF-8")); + try (FSDataOutputStream fos = fs.create(writeLocation)) { + IOUtils.copy(baos, fos); + } + + return writeLocation; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java new file mode 100644 index 0000000..941d2d8 --- /dev/null +++ b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTargetITSpec.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.thrift.TException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static junit.framework.Assert.assertEquals; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class HCatTargetITSpec extends CrunchTestSupport { + + private static IMetaStoreClient client; + private static Configuration conf; + private static TemporaryPath tempDir; + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void setUp() throws Throwable { + HCatTestSuiteIT.startTest(); + client = HCatTestSuiteIT.getClient(); + conf = HCatTestSuiteIT.getConf(); + tempDir = HCatTestSuiteIT.getRootPath(); + } + + @AfterClass + public static void tearDown() throws Exception { + HCatTestSuiteIT.endTest(); + } + + @Test + public void test_successfulWriteToHCatTarget() throws IOException, HiveException, TException { + String tableName = testName.getMethodName(); + Path tableRootLocation = tempDir.getPath(tableName); + String data = "17,josh\n29,indiana\n"; + writeDataToHdfs(data, tableRootLocation, conf); + + FieldSchema partitionSchema = new FieldSchema(); + partitionSchema.setName("timestamp"); + partitionSchema.setType("string"); + HCatTestUtils.createTable(client, "default", tableName, TableType.EXTERNAL_TABLE, tableRootLocation, + Lists.newArrayList(partitionSchema)); + + Pipeline pipeline = new MRPipeline(HCatSourceITSpec.class, conf); + PCollection<String> contents = pipeline.readTextFile(tableRootLocation.toString()); + PCollection<HCatRecord> hcatRecords = contents.parallelDo(new HCatTestUtils.Fns.MapHCatRecordFn(), + Writables.writables(HCatRecord.class)); + Map<String, String> partitions = new HashMap<String, String>() { + { + { + put("timestamp", "1234"); + } + } + }; + + pipeline.write(hcatRecords, ToHCat.table("default", tableName, partitions)); + pipeline.run(); + + // ensure partition was created + List<Partition> partitionList = client.listPartitions("default", tableName, (short) 5); + assertThat(partitionList.size(), is(1)); + Partition newPartition = Iterators.getOnlyElement(partitionList.iterator()); + assertThat(newPartition.getValuesIterator().next(), is("1234")); + + // read data from table to ensure it was written correctly + HCatSourceTarget source = (HCatSourceTarget) FromHCat.table("default", tableName, "timestamp='1234'"); + PCollection<HCatRecord> read = pipeline.read(source); + HCatSchema schema = source.getTableSchema(pipeline.getConfiguration()); + ArrayList<Pair<Integer, String>> mat = Lists.newArrayList( + read.parallelDo(new HCatTestUtils.Fns.MapPairFn(schema), Avros.tableOf(Avros.ints(), Avros.strings())) + .materialize()); + assertEquals(ImmutableList.of(Pair.of(17, "josh"), Pair.of(29, "indiana")), mat); + partitions = new HashMap<String, String>() { + { + { + put("timestamp", "5678"); + } + } + }; + + pipeline.write(read, ToHCat.table("default", tableName, partitions)); + pipeline.done(); + } + + @Test + public void test_successfulWriteToHCatTarget_GroupByKey() throws IOException, HiveException, TException { + + String tableName = testName.getMethodName(); + Path tableRootLocation = tempDir.getPath(tableName); + String data = "17,josh\n29,indiana\n"; + writeDataToHdfs(data, tableRootLocation, conf); + + FieldSchema partitionSchema = new FieldSchema(); + partitionSchema.setName("timestamp"); + partitionSchema.setType("string"); + HCatTestUtils.createTable(client, "default", tableName, TableType.EXTERNAL_TABLE, tableRootLocation, + Lists.newArrayList(partitionSchema)); + + Pipeline pipeline = new MRPipeline(HCatSourceITSpec.class, conf); + PCollection<String> contents = pipeline.readTextFile(tableRootLocation.toString()); + PCollection<HCatRecord> hcatRecords = contents.parallelDo(new HCatTestUtils.Fns.MapHCatRecordFn(), + Writables.writables(HCatRecord.class)); + Map<String, String> partitions = new HashMap<String, String>() { + { + { + put("timestamp", "1234"); + } + } + }; + + HCatTarget target = new HCatTarget(tableName, partitions); + + pipeline.write(hcatRecords, target); + pipeline.run(); + + // ensure partition was created + List<Partition> partitionList = client.listPartitions("default", tableName, (short) 5); + assertThat(partitionList.size(), is(1)); + Partition newPartition = Iterators.getOnlyElement(partitionList.iterator()); + assertThat(newPartition.getValuesIterator().next(), is("1234")); + + // read data from table to ensure it was written correctly + HCatSourceTarget source = (HCatSourceTarget) FromHCat.table("default", tableName, "timestamp='1234'"); + PCollection<HCatRecord> read = pipeline.read(source); + HCatSchema schema = source.getTableSchema(pipeline.getConfiguration()); + PGroupedTable<String, DefaultHCatRecord> table = read.parallelDo(new HCatTestUtils.Fns.GroupByHCatRecordFn(), + Writables.tableOf(Writables.strings(), Writables.writables(DefaultHCatRecord.class))).groupByKey(); + + Iterable<Pair<Integer, String>> mat = table + .parallelDo(new HCatTestUtils.Fns.IterableToHCatRecordMapFn(), Writables.writables(HCatRecord.class)) + .parallelDo(new HCatTestUtils.Fns.MapPairFn(schema), Avros.tableOf(Avros.ints(), Avros.strings())) + .materialize(); + + + assertEquals(ImmutableList.of(Pair.of(29, "indiana"), Pair.of(17, "josh")), ImmutableList.copyOf(mat)); + pipeline.done(); + } + + @Test + public void test_HCatTarget_WriteToNonNativeTable_HBase() throws Exception { + HBaseTestingUtility hbaseTestUtil = null; + try { + String db = "default"; + String sourceHiveTable = "source_table"; + String destinationHiveTable = "dest_table"; + Configuration configuration = HBaseConfiguration.create(conf); + hbaseTestUtil = new HBaseTestingUtility(configuration); + hbaseTestUtil.startMiniZKCluster(); + hbaseTestUtil.startMiniHBaseCluster(1, 1); + + org.apache.hadoop.hbase.client.Table sourceTable = hbaseTestUtil.createTable(TableName.valueOf(sourceHiveTable), + "fam"); + + String key1 = "this-is-a-key"; + Put put = new Put(Bytes.toBytes(key1)); + put.addColumn("fam".getBytes(), "foo".getBytes(), "17".getBytes()); + sourceTable.put(put); + String key2 = "this-is-a-key-too"; + Put put2 = new Put(Bytes.toBytes(key2)); + put2.addColumn("fam".getBytes(), "foo".getBytes(), "29".getBytes()); + sourceTable.put(put2); + sourceTable.close(); + + // create Hive Table for source table + org.apache.hadoop.hive.ql.metadata.Table tbl = new org.apache.hadoop.hive.ql.metadata.Table(db, sourceHiveTable); + tbl.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + tbl.setTableType(TableType.EXTERNAL_TABLE); + + FieldSchema f1 = new FieldSchema(); + f1.setName("foo"); + f1.setType("int"); + FieldSchema f2 = new FieldSchema(); + f2.setName("key"); + f2.setType("string"); + + tbl.setProperty("storage_handler", "org.apache.hadoop.hive.hbase.HBaseStorageHandler"); + tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); + tbl.setFields(ImmutableList.of(f1, f2)); + tbl.setSerdeParam("hbase.columns.mapping", "fam:foo,:key"); + this.client.createTable(tbl.getTTable()); + + // creates destination table + hbaseTestUtil.createTable(TableName.valueOf(destinationHiveTable), "fam"); + org.apache.hadoop.hive.ql.metadata.Table destTable = new org.apache.hadoop.hive.ql.metadata.Table(db, + destinationHiveTable); + destTable.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + destTable.setTableType(TableType.EXTERNAL_TABLE); + + destTable.setProperty("storage_handler", "org.apache.hadoop.hive.hbase.HBaseStorageHandler"); + destTable.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); + destTable.setFields(ImmutableList.of(f1, f2)); + destTable.setSerdeParam("hbase.columns.mapping", "fam:foo,:key"); + this.client.createTable(destTable.getTTable()); + + Pipeline p = new MRPipeline(HCatSourceITSpec.class, configuration); + PCollection<HCatRecord> records = p.read(FromHCat.table(sourceHiveTable)); + p.write(records, ToHCat.table(destinationHiveTable)); + p.done(); + + Connection connection = null; + try { + Scan scan = new Scan(); + + connection = ConnectionFactory.createConnection(configuration); + org.apache.hadoop.hbase.client.Table table = connection.getTable(TableName.valueOf(destinationHiveTable)); + ResultScanner scanner = table.getScanner(scan); + + Result result = null; + List<Pair<String, Integer>> actual = new ArrayList<>(); + while (((result = scanner.next()) != null)) { + String value = Bytes.toString(result.getValue("fam".getBytes(), "foo".getBytes())); + actual.add(Pair.of(Bytes.toString(result.getRow()), Integer.parseInt(value))); + } + + Assert.assertEquals(ImmutableList.of(Pair.of(key1, 17), Pair.of(key2, 29)), actual); + } finally { + IOUtils.closeQuietly(connection); + } + } finally { + if (hbaseTestUtil != null) { + hbaseTestUtil.shutdownMiniHBaseCluster(); + hbaseTestUtil.shutdownMiniZKCluster(); + } + } + } + + // writes data to the specified location and ensures the directory exists + // prior to writing + private Path writeDataToHdfs(String data, Path location, Configuration conf) throws IOException { + FileSystem fs = location.getFileSystem(conf); + Path writeLocation = new Path(location, UUID.randomUUID().toString()); + fs.mkdirs(location); + fs.create(writeLocation); + ByteArrayInputStream baos = new ByteArrayInputStream(data.getBytes("UTF-8")); + try (FSDataOutputStream fos = fs.create(writeLocation)) { + IOUtils.copy(baos, fos); + } + + return writeLocation; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java new file mode 100644 index 0000000..b2f41d9 --- /dev/null +++ b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestSuiteIT.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import org.apache.crunch.test.TemporaryPath; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.UUID; + +/** + * Test suite to re-use the same hive metastore instance for all tests in the + * suite + */ +@RunWith(Suite.class) [email protected]({ HCatSourceITSpec.class, HCatTargetITSpec.class }) +public class HCatTestSuiteIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(HCatTestSuiteIT.class); + private static boolean runAsSuite = false; + + public static TemporaryPath hadoopTempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir"); + + static HiveConf hconf; + static IMetaStoreClient client; + static Configuration conf = null; + + @BeforeClass + public static void startSuite() throws Exception { + runAsSuite = true; + setupFileSystem(); + setupMetaStore(); + } + + @AfterClass + public static void endSuite() throws Exception { + cleanup(); + } + + public static Configuration getConf() { + return conf; + } + + public static TemporaryPath getRootPath() { + return hadoopTempDir; + } + + public static IMetaStoreClient getClient() { + return client; + } + + private static void setupMetaStore() throws Exception { + conf = hadoopTempDir.getDefaultConfiguration(); + // set the warehouse location to the location of the temp dir, so managed + // tables return a size estimate of the table + String databaseLocation = hadoopTempDir.getPath("metastore_db").toString(); + String derbyLocation = hadoopTempDir.getPath("derby.log").toString(); + String jdbcUrl = "jdbc:derby:;databaseName=" + databaseLocation + ";create=true"; + conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, jdbcUrl); + conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), hadoopTempDir.getRootPath().toString()); + // allow HMS to create any tables necessary + conf.set("datanucleus.schema.autoCreateTables", "true"); + // disable verification as the tables won't exist at startup + conf.set("hive.metastore.schema.verification", "false"); + // write derby logs to the temp directory to be cleaned up automagically after the test runs + System.setProperty("derby.stream.error.file", derbyLocation); + hconf = HCatUtil.getHiveConf(conf); + client = HCatUtil.getHiveMetastoreClient(hconf); + } + + private static void setupFileSystem() throws Exception { + try { + hadoopTempDir.create(); + } catch (Throwable throwable) { + throw (Exception) throwable; + } + } + + public static void startTest() throws Exception { + if (!runAsSuite) { + setupFileSystem(); + setupMetaStore(); + } + } + + public static void endTest() throws Exception { + if (!runAsSuite) { + cleanup(); + } + } + + private static void cleanup() throws IOException { + hadoopTempDir.delete(); + client.close(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java new file mode 100644 index 0000000..0b047ea --- /dev/null +++ b/crunch-hcatalog/src/it/java/org/apache/crunch/io/hcatalog/HCatTestUtils.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import com.google.common.collect.ImmutableList; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +public class HCatTestUtils { + + public static class Fns { + + /** + * Maps an HCatRecord with a Key to a pair of the Key and the value of the + * column "foo" + */ + public static class KeyMapPairFn extends MapFn<HCatRecord, Pair<String, Integer>> { + + private HCatSchema schema; + + public KeyMapPairFn(HCatSchema schema) { + this.schema = schema; + } + + @Override + public Pair<String, Integer> map(HCatRecord input) { + try { + return Pair.of(input.getString("key", schema), input.getInteger("foo", schema)); + } catch (HCatException e) { + throw new CrunchRuntimeException(e); + } + } + } + + /** + * Takes an HCatRecord and emits a Pair<Integer, String>. assumes the + * columns in the record are "foo" (int) and "bar" (string) + */ + public static class MapPairFn extends MapFn<HCatRecord, Pair<Integer, String>> { + + private HCatSchema schema; + + public MapPairFn(HCatSchema schema) { + this.schema = schema; + } + + @Override + public Pair<Integer, String> map(HCatRecord input) { + try { + return Pair.of(input.getInteger("foo", schema), input.getString("bar", schema)); + } catch (HCatException e) { + throw new CrunchRuntimeException(e); + } + } + } + + /** + * Simple MapFn that emits the input record and emits a Pair, with the first + * element being "record". Useful for when testing group by with the value + * being HCatRecord + */ + public static class GroupByHCatRecordFn extends MapFn<HCatRecord, Pair<String, DefaultHCatRecord>> { + + @Override + public Pair<String, DefaultHCatRecord> map(HCatRecord input) { + return Pair.of("record", (DefaultHCatRecord) input); + } + } + + /** + * Takes the input iterable of DefaultHCatRecords and emits Pairs that + * contain the value of the columns "foo" and "bar" + */ + public static class HCatRecordMapFn extends DoFn<Pair<String, Iterable<DefaultHCatRecord>>, Pair<Integer, String>> { + + private HCatSchema schema; + + public HCatRecordMapFn(HCatSchema schema) { + this.schema = schema; + } + + @Override + public void process(Pair<String, Iterable<DefaultHCatRecord>> input, Emitter<Pair<Integer, String>> emitter) { + for (final HCatRecord record : input.second()) { + try { + emitter.emit(Pair.of(record.getInteger("foo", schema), record.getString("bar", schema))); + } catch (HCatException e) { + throw new CrunchRuntimeException(e); + } + } + } + } + + /** + * Takes a CSV line and maps it into an HCatRecord + */ + public static class MapHCatRecordFn extends MapFn<String, HCatRecord> { + + static HCatSchema dataSchema; + + @Override + public void initialize() { + try { + dataSchema = HCatOutputFormat.getTableSchema(getConfiguration()); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public HCatRecord map(String input) { + try { + return getHCatRecord(input.split(",")); + } catch (HCatException e) { + throw new CrunchRuntimeException(e); + } + } + + private static HCatRecord getHCatRecord(String[] csvParts) throws HCatException { + // must be set, or all subsequent sets on HCatRecord will fail. setting + // the size + // initializes the initial backing array + DefaultHCatRecord hcatRecord = new DefaultHCatRecord(dataSchema.size()); + + hcatRecord.set("foo", dataSchema, Integer.parseInt(csvParts[0])); + hcatRecord.set("bar", dataSchema, csvParts[1]); + + return hcatRecord; + } + } + + /** + * Takes an iterable of HCatRecords and emits each HCatRecord (turns a + * PTable into a PCollection) + */ + public static class IterableToHCatRecordMapFn extends DoFn<Pair<String, Iterable<DefaultHCatRecord>>, HCatRecord> { + + @Override + public void process(Pair<String, Iterable<DefaultHCatRecord>> input, Emitter<HCatRecord> emitter) { + for (final HCatRecord record : input.second()) { + emitter.emit(record); + } + } + } + } + + public static Table createUnpartitionedTable(IMetaStoreClient client, String tableName, TableType type) + throws IOException, HiveException, TException { + return createTable(client, "default", tableName, type, null, Collections.<FieldSchema> emptyList()); + } + + public static Table createUnpartitionedTable(IMetaStoreClient client, String tableName, TableType type, + @Nullable Path datalocation) throws IOException, HiveException, TException { + return createTable(client, "default", tableName, type, datalocation, Collections.<FieldSchema> emptyList()); + } + + public static Table createTable(IMetaStoreClient client, String db, String tableName, TableType type, + @Nullable Path datalocation, List<FieldSchema> partCols) throws IOException, HiveException, TException { + org.apache.hadoop.hive.ql.metadata.Table tbl = new org.apache.hadoop.hive.ql.metadata.Table(db, tableName); + tbl.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + tbl.setTableType(type); + + if (datalocation != null) + tbl.setDataLocation(datalocation); + + FieldSchema f1 = new FieldSchema(); + f1.setName("foo"); + f1.setType("int"); + FieldSchema f2 = new FieldSchema(); + f2.setName("bar"); + f2.setType("string"); + + if (partCols != null && !partCols.isEmpty()) + tbl.setPartCols(partCols); + + tbl.setFields(ImmutableList.of(f1, f2)); + tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); + tbl.setSerdeParam("field.delim", ","); + tbl.setSerdeParam("serialization.format", ","); + tbl.setInputFormatClass("org.apache.hadoop.mapred.TextInputFormat"); + tbl.setOutputFormatClass("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"); + client.createTable(tbl.getTTable()); + + return client.getTable(db, tableName); + } + + public static Partition createPartition(Table table, Path partLocation, List<String> partValues) { + Partition partition = new Partition(); + partition.setDbName(table.getDbName()); + partition.setTableName(table.getTableName()); + partition.setSd(new StorageDescriptor(table.getSd())); + partition.setValues(partValues); + partition.getSd().setLocation(partLocation.toString()); + return partition; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/it/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/it/resources/log4j.properties b/crunch-hcatalog/src/it/resources/log4j.properties new file mode 100644 index 0000000..bb987d6 --- /dev/null +++ b/crunch-hcatalog/src/it/resources/log4j.properties @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ***** Set root logger level to INFO and its only appender to A. +log4j.logger.org.apache.crunch=info, A + +# Log warnings on Hadoop for the local runner when testing +log4j.logger.org.apache.hadoop=warn, A +# Except for Configuration, which is chatty. +log4j.logger.org.apache.hadoop.conf.Configuration=error, A + +# ***** A is set to be a ConsoleAppender. +log4j.appender.A=org.apache.log4j.ConsoleAppender +log4j.appender.A.org.apache.derby=debug +# ***** A uses PatternLayout. +log4j.appender.A.layout=org.apache.log4j.PatternLayout +log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java new file mode 100644 index 0000000..3988f00 --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/FromHCat.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import org.apache.crunch.Source; +import org.apache.hive.hcatalog.data.HCatRecord; + +import javax.annotation.Nullable; + +/** + * Static factory methods for creating sources to read from HCatalog. + * + * Access examples: + * <pre> + * {@code + * + * Pipeline pipeline = new MRPipeline(this.getClass()); + * + * PCollection<HCatRecord> hcatRecords = pipeline.read(FromHCat.table("my-table")) + * } + * </pre> + */ +public final class FromHCat { + + private FromHCat() { + } + + /** + * Creates a {@code Source<HCatRecord>} instance from a hive table in the + * default database instance "default". + * + * @param table + * table name + * @throw IllegalArgumentException if table is null or empty + */ + public static Source<HCatRecord> table(String table) { + return new HCatSourceTarget(table); + } + + /** + * Creates a {code Source<HCatRecord>} instance from a hive table. + * + * @param database + * database name + * @param table + * table name + * @throw IllegalArgumentException if table is null or empty + */ + public static Source<HCatRecord> table(String database, String table) { + return new HCatSourceTarget(database, table); + } + + /** + * Creates a {code Source<HCatRecord>} instance from a hive table with custom + * filter criteria. If {@code database} is null, uses the default + * database instance "default" + * + * @param database + * database name + * @param table + * table name + * @param filter + * a custom filter criteria, e.g. specify partitions by + * {@code 'date= "20140424"'} or {@code 'date < "20140424"'} + * @throw IllegalArgumentException if table is null or empty + */ + public static Source<HCatRecord> table(@Nullable String database, String table, String filter) { + return new HCatSourceTarget(database, table, filter); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java new file mode 100644 index 0000000..0f1d360 --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataIterable.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.io.FormatBundle; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +public class HCatRecordDataIterable implements Iterable<HCatRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(HCatRecordDataIterable.class); + + private final FormatBundle<HCatInputFormat> bundle; + private final Configuration conf; + + public HCatRecordDataIterable(FormatBundle<HCatInputFormat> bundle, Configuration configuration) { + this.bundle = bundle; + this.conf = configuration; + } + + @Override + public Iterator<HCatRecord> iterator() { + try { + Job job = Job.getInstance(bundle.configure(conf)); + + final InputFormat fmt = ReflectionUtils.newInstance(bundle.getFormatClass(), conf); + final TaskAttemptContext ctxt = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + return Iterators.concat(Lists.transform(fmt.getSplits(job), new Function<InputSplit, Iterator<HCatRecord>>() { + + @Override + public Iterator<HCatRecord> apply(InputSplit split) { + RecordReader reader = null; + try { + reader = fmt.createRecordReader(split, ctxt); + reader.initialize(split, ctxt); + } catch (IOException | InterruptedException e) { + throw new CrunchRuntimeException(e); + } + return new HCatRecordReaderIterator(reader); + } + }).iterator()); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + + private static class HCatRecordReaderIterator<T> implements Iterator<T> { + + private final RecordReader<WritableComparable, T> reader; + private boolean hasNext; + private T current; + + public HCatRecordReaderIterator(RecordReader reader) { + this.reader = reader; + try { + hasNext = reader.nextKeyValue(); + if (hasNext) + current = this.reader.getCurrentValue(); + } catch (IOException | InterruptedException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public T next() { + T ret = current; + try { + hasNext = reader.nextKeyValue(); + + if (hasNext) { + current = reader.getCurrentValue(); + } + } catch (IOException | InterruptedException e) { + throw new CrunchRuntimeException(e); + } + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Removing elements is not supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java new file mode 100644 index 0000000..e4d4225 --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatRecordDataReadable.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.ReadableData; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.FormatBundle; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public class HCatRecordDataReadable implements ReadableData<HCatRecord> { + + private final FormatBundle<HCatInputFormat> bundle; + private final String database; + private final String table; + private final String filter; + + public HCatRecordDataReadable(FormatBundle<HCatInputFormat> bundle, String database, String table, String filter) { + this.bundle = bundle; + this.database = database; + this.table = table; + this.filter = filter; + } + + @Override + public Set<SourceTarget<?>> getSourceTargets() { + return ImmutableSet.of(); + } + + @Override + public void configure(Configuration conf) { + // need to configure the input format, so the JobInputInfo is populated with + // the partitions to be processed. the partitions are needed to derive the + // input splits and to get a size estimate for the HCatSource. + HCatSourceTarget.configureHCatFormat(conf, bundle, database, table, filter); + } + + @Override + public Iterable<HCatRecord> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException { + return new HCatRecordDataIterable(bundle, context.getConfiguration()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java new file mode 100644 index 0000000..39a66cf --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatSourceTarget.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.ReadableData; +import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.io.CrunchInputs; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.SourceTargetHelper; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hive.hcatalog.mapreduce.PartInfo; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +public class HCatSourceTarget extends HCatTarget implements ReadableSourceTarget<HCatRecord> { + + private static final Logger LOGGER = LoggerFactory.getLogger(HCatSourceTarget.class); + private static final PType<HCatRecord> PTYPE = Writables.writables(HCatRecord.class); + private Configuration hcatConf; + + private final FormatBundle<HCatInputFormat> bundle = FormatBundle.forInput(HCatInputFormat.class); + private final String database; + private final String table; + private final String filter; + private Table hiveTableCached; + + // Default guess at the size of the data to materialize + private static final long DEFAULT_ESTIMATE = 1024 * 1024 * 1024; + + /** + * Creates a new instance to read from the specified {@code table} and the + * {@link org.apache.hadoop.hive.metastore.MetaStoreUtils#DEFAULT_DATABASE_NAME + * default} database + * + * @param table + * @throw IllegalArgumentException if table is null or empty + */ + public HCatSourceTarget(String table) { + this(DEFAULT_DATABASE_NAME, table); + } + + /** + * Creates a new instance to read from the specified {@code database} and + * {@code table} + * + * @param database + * the database to read from + * @param table + * the table to read from + * @throw IllegalArgumentException if table is null or empty + */ + public HCatSourceTarget(String database, String table) { + this(database, table, null); + } + + /** + * Creates a new instance to read from the specified {@code database} and + * {@code table}, restricting partitions by the specified {@code filter}. If + * the database isn't specified it will default to the + * {@link org.apache.hadoop.hive.metastore.MetaStoreUtils#DEFAULT_DATABASE_NAME + * default} database. + * + * @param database + * the database to read from + * @param table + * the table to read from + * @param filter + * the filter to apply to find partitions + * @throw IllegalArgumentException if table is null or empty + */ + public HCatSourceTarget(@Nullable String database, String table, String filter) { + super(database, table); + this.database = Strings.isNullOrEmpty(database) ? DEFAULT_DATABASE_NAME : database; + Preconditions.checkArgument(!StringUtils.isEmpty(table), "table cannot be null or empty"); + this.table = table; + this.filter = filter; + } + + @Override + public SourceTarget<HCatRecord> conf(String key, String value) { + return null; + } + + @Override + public Source<HCatRecord> inputConf(String key, String value) { + bundle.set(key, value); + return this; + } + + @Override + public PType<HCatRecord> getType() { + return PTYPE; + } + + @Override + public Converter<?, ?, ?, ?> getConverter() { + return PTYPE.getConverter(); + } + + @Override + public void configureSource(Job job, int inputId) throws IOException { + Configuration jobConf = job.getConfiguration(); + + if (hcatConf == null) { + hcatConf = configureHCatFormat(jobConf, bundle, database, table, filter); + } + + if (inputId == -1) { + job.setMapperClass(CrunchMapper.class); + job.setInputFormatClass(bundle.getFormatClass()); + bundle.configure(jobConf); + } else { + Path dummy = new Path("/hcat/" + database + "/" + table); + CrunchInputs.addInputPath(job, dummy, bundle, inputId); + } + } + + static Configuration configureHCatFormat(Configuration conf, FormatBundle<HCatInputFormat> bundle, String database, + String table, String filter) { + // It is tricky to get the HCatInputFormat configured correctly. + // + // The first parameter of setInput() is for both input and output. + // It reads Hive MetaStore's JDBC URL or HCatalog server's Thrift address, + // and saves the schema into the configuration for runtime needs + // (e.g. data location). + // + // Our solution is to create another configuration object, and + // compares with the original one to see what has been added. + Configuration newConf = new Configuration(conf); + try { + HCatInputFormat.setInput(newConf, database, table, filter); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + + for (Map.Entry<String, String> e : newConf) { + String key = e.getKey(); + String value = e.getValue(); + if (!Objects.equal(value, conf.get(key))) { + bundle.set(key, value); + } + } + + return newConf; + } + + @Override + public long getSize(Configuration conf) { + + // this is tricky. we want to derive the size by the partitions being + // retrieved. these aren't known until after the HCatInputFormat has + // been initialized (see #configureHCatFormat). preferably, the input + // format shouldn't be configured twice to cut down on the number of calls + // to hive. getSize can be called before configureSource is called when the + // collection is being materialized or a groupby has been performed. so, the + // InputJobInfo, which has the partitions, won't be present when this + // happens. so, configure here or in configureSource just once. + if (hcatConf == null) { + hcatConf = configureHCatFormat(conf, bundle, database, table, filter); + } + try { + InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(hcatConf.get(HCatConstants.HCAT_KEY_JOB_INFO)); + List<PartInfo> partitions = inputJobInfo.getPartitions(); + + if (partitions.size() > 0) { + LOGGER.debug("Found [{}] partitions to read", partitions.size()); + long size = 0; + for (final PartInfo partition : partitions) { + String totalSize = partition.getInputStorageHandlerProperties().getProperty(StatsSetupConst.TOTAL_SIZE); + + if (StringUtils.isEmpty(totalSize)) { + long pathSize = SourceTargetHelper.getPathSize(conf, new Path(partition.getLocation())); + if (pathSize == -1) { + LOGGER.info("Unable to locate directory [{}]; skipping", partition.getLocation()); + // could be an hbase table, in which there won't be a size + // estimate if this is a valid native table partition, but no + // data, materialize won't find anything + } else if (pathSize == 0) { + size += DEFAULT_ESTIMATE; + } else { + size += pathSize; + } + } else { + size += Long.parseLong(totalSize); + } + } + return size; + } else { + Table hiveTable = getHiveTable(conf); + LOGGER.debug("Attempting to get table size from table properties for table [{}]", table); + + // managed table will have the size on it, but should be caught as a + // partition.size == 1 if the table isn't partitioned + String totalSize = hiveTable.getParameters().get(StatsSetupConst.TOTAL_SIZE); + if (!StringUtils.isEmpty(totalSize)) + return Long.parseLong(totalSize); + + // not likely to be hit. the totalSize should have been available on the + // partitions returned (for unpartitioned tables one partition will be + // returned, referring to the entire table), or on the table metadata + // (only there for managed tables). if neither existed, then check + // against the data location as backup. note: external tables can be + // somewhere other than the root location as defined by the table, + // as partitions can exist elsewhere. ideally this scenario is caught + // by the if statement with partitions > 0 + LOGGER.debug("Unable to find size on table properties [{}], attempting to get it from table data location [{}]", + hiveTable.getTableName(), hiveTable.getDataLocation()); + return SourceTargetHelper.getPathSize(conf, hiveTable.getDataLocation()); + } + } catch (IOException | TException e) { + LOGGER.info("Unable to determine an estimate for requested table [{}], using default", table, e); + return DEFAULT_ESTIMATE; + } + } + + /** + * Extracts the {@link HCatSchema} from the specified {@code conf}. + * + * @param conf + * the conf containing the table schema + * @return the HCatSchema + * + * @throws TException + * if there was an issue communicating with the metastore + * @throws IOException + * if there was an issue connecting to the metastore + */ + public HCatSchema getTableSchema(Configuration conf) throws TException, IOException { + Table hiveTable = getHiveTable(conf); + return HCatUtil.extractSchema(hiveTable); + } + + @Override + public long getLastModifiedAt(Configuration conf) { + LOGGER.warn("Unable to determine the last modified time for db [{}] and table [{}]", database, table); + return -1; + } + + @Override + public boolean equals(Object o) { + if (o == null || !getClass().equals(o.getClass())) { + return false; + } + HCatSourceTarget that = (HCatSourceTarget) o; + return Objects.equal(this.database, that.database) && Objects.equal(this.table, that.table) + && Objects.equal(this.filter, that.filter); + } + + @Override + public int hashCode() { + return Objects.hashCode(table, database, filter); + } + + @Override + public String toString() { + return new ToStringBuilder(this).append("database", database).append("table", table).append("filter", filter) + .toString(); + } + + private Table getHiveTable(Configuration conf) throws IOException, TException { + if (hiveTableCached != null) { + return hiveTableCached; + } + + IMetaStoreClient hiveMetastoreClient = HCatUtil.getHiveMetastoreClient(new HiveConf(conf, HCatSourceTarget.class)); + hiveTableCached = HCatUtil.getTable(hiveMetastoreClient, database, table); + return hiveTableCached; + } + + @Override + public Iterable<HCatRecord> read(Configuration conf) throws IOException { + if (hcatConf == null) { + hcatConf = configureHCatFormat(conf, bundle, database, table, filter); + } + + return new HCatRecordDataIterable(bundle, hcatConf); + } + + @Override + public ReadableData<HCatRecord> asReadable() { + return new HCatRecordDataReadable(bundle, database, table, filter); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java new file mode 100644 index 0000000..114cf55 --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/HCatTarget.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import com.google.common.base.Objects; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; +import org.apache.crunch.io.CrunchOutputs; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.MapReduceTarget; +import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.mapreduce.CrunchHCatOutputFormat; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nullable; + +public class HCatTarget implements MapReduceTarget { + + private static final PType<HCatRecord> PTYPE = Writables.writables(HCatRecord.class); + private static final PType<DefaultHCatRecord> DEFAULT_PTYPE = Writables.writables(DefaultHCatRecord.class); + + private final OutputJobInfo info; + private final FormatBundle bundle = FormatBundle.forOutput(CrunchHCatOutputFormat.class); + private Table hiveTableCached; + + /** + * Constructs a new instance to write to the provided hive {@code table} name. + * Writes to the "default" database. + * + * Note: if the destination table is partitioned, this constructor should not + * be used. It will only be usable by unpartitioned tables + * + * @param table + * the hive table to write to + */ + public HCatTarget(String table) { + this(null, table, null); + } + + /** + * Constructs a new instance to write to the provided hive {@code table} name, + * using the provided {@code database}. If null, uses "default" database. + * + * Note: if the destination table is partitioned, this constructor should not + * be used. It will only be usable by unpartitioned tables + * + * @param database + * the hive database to use for table namespacing + * @param table + * the hive table to write to + */ + public HCatTarget(@Nullable String database, String table) { + this(database, table, null); + } + + /** + * Constructs a new instance to write to the provided hive {@code table} name + * and {@code partitionValues}. Writes to the "default" database. + * + * Note: partitionValues will be assembled into a single directory path. + * + * For example, if the partition values are: + * + * <pre> + * [year, 2017], + * [month,11], + * [day, 10] + * + * The constructed directory path will be + * "[dataLocationRoot]/year=2017/month=11/day=10" + * </pre> + * + * @param table + * the hive table to write to + * @param partitionValues + * the partition within the table it should be written + */ + public HCatTarget(String table, Map<String, String> partitionValues) { + this(null, table, partitionValues); + } + + /** + * Constructs a new instance to write to the provided {@code database}, + * {@code table}, and to the specified {@code partitionValues}. If + * {@code database} isn't specified, the "default" database is used + * + * Note: partitionValues will be assembled into a single directory path. + * + * For example, if the partition values are: + * + * <pre> + * [year, 2017], + * [month,11], + * [day, 10] + * + * The constructed directory path will be + * "[dataLocationRoot]/year=2017/month=11/day=10" + * </pre> + * + * @param database + * the hive database to use for table namespacing + * @param table + * the hive table to write to + * @param partitionValues + * the partition within the table it should be written + */ + public HCatTarget(@Nullable String database, String table, @Nullable Map<String, String> partitionValues) { + this.info = OutputJobInfo.create(database, table, partitionValues); + } + + @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + if (Strings.isNullOrEmpty(name)) { + throw new AssertionError("Named output wasn't generated. This shouldn't happen"); + } + CrunchOutputs.addNamedOutput(job, name, bundle, NullWritable.class, HCatRecord.class); + + try { + CrunchHCatOutputFormat.setOutput(job, info); + + // set the schema into config. this would be necessary if any downstream + // tasks need the schema translated between a format (e.g. avro) and + // HCatRecord for the destination table + Table table = getHiveTable(job.getConfiguration()); + CrunchHCatOutputFormat.setSchema(job, HCatUtil.extractSchema(table)); + } catch (TException | IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public Target outputConf(String key, String value) { + bundle.set(key, value); + return this; + } + + @Override + public boolean handleExisting(WriteMode writeMode, long lastModifiedAt, Configuration conf) { + return writeMode == WriteMode.DEFAULT; + } + + @Override + public boolean accept(OutputHandler handler, PType<?> ptype) { + if (!acceptType(ptype)) { + return false; + } + + handler.configure(this, ptype); + return true; + } + + @Override + public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) { + return ptype.getConverter(); + } + + @Override + public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { + if (acceptType(ptype)) + return (SourceTarget<T>) new HCatSourceTarget(info.getDatabaseName(), info.getTableName()); + + return null; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("database", info.getDatabaseName()) + .append("table", info.getTableName()) + .append("partition", info.getPartitionValues()) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hashCode(info.getDatabaseName(), info.getTableName(), info.getPartitionValues()); + } + + @Override + public boolean equals(Object o) { + if (o == null || !getClass().equals(o.getClass())) { + return false; + } + + HCatTarget that = (HCatTarget) o; + return Objects.equal(this.info.getDatabaseName(), that.info.getDatabaseName()) + && Objects.equal(this.info.getTableName(), that.info.getTableName()) + && Objects.equal(this.info.getPartitionValues(), that.info.getPartitionValues()); + } + + private boolean acceptType(PType<?> ptype) { + return Objects.equal(ptype, PTYPE) || Objects.equal(ptype, DEFAULT_PTYPE); + } + + private Table getHiveTable(Configuration conf) throws IOException, TException { + if (hiveTableCached != null) { + return hiveTableCached; + } + + IMetaStoreClient hiveMetastoreClient = HCatUtil.getHiveMetastoreClient(new HiveConf(conf, HCatTarget.class)); + hiveTableCached = HCatUtil.getTable(hiveMetastoreClient, info.getDatabaseName(), info.getTableName()); + return hiveTableCached; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java new file mode 100644 index 0000000..76add9e --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/crunch/io/hcatalog/ToHCat.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hcatalog; + +import org.apache.crunch.Target; + +import java.util.Map; + +/** + * Factory static helper methods for writing data to HCatalog + * + * <pre> + * {@code + * Pipeline pipeline = new MRPipeline(this.getClass()); + * + * PCollection<HCatRecord> hcatRecords = pipeline.read(FromHCat.table("this-table"); + * + * pipeline.write(hcatRecords, ToHCat.table("that-table")); + * } + * </pre> + */ +public class ToHCat { + + /** + * Constructs a new instance to write to the provided hive {@code table} name. + * Writes to the "default" database. + * + * Note: if the destination table is partitioned, this constructor should not + * be used. It will only be usable by unpartitioned tables + * + * @param tableName + * the hive table to write to + */ + public static Target table(String tableName) { + return new HCatTarget(tableName); + } + + /** + * Constructs a new instance to write to the provided hive {@code tableName}, + * using the provided {@code database}. If null, uses "default" database. + * + * Note: if the destination table is partitioned, this constructor should not + * be used. It will only be usable by unpartitioned tables + * + * @param database + * the hive database to use for table namespacing + * @param tableName + * the hive table to write to + */ + public static Target table(String database, String tableName) { + return new HCatTarget(database, tableName); + } + + /** + * Constructs a new instance to write to the provided hive {@code table} name + * and {@code partitionValues}. Writes to the "default" database. + * + * Note: partitionValues will be assembled into a single directory path. + * + * For example, if the partition values are: + * + * <pre> + * [year, 2017], + * [month,11], + * [day, 10] + * + * The constructed directory path will be + * "[dataLocationRoot]/year=2017/month=11/day=10" + * </pre> + * + * @param tableName + * the hive table to write to + * @param partition + * the partition within the table it should be written + */ + public static Target table(String tableName, Map<String, String> partition) { + return new HCatTarget(tableName, partition); + } + + /** + * Constructs a new instance to write to the provided {@code database}, + * {@code tableName}, and to the specified {@code partition}. If + * {@code database} isn't specified, the "default" database is used + * + * Note: partitionValues will be assembled into a single directory path. + * + * For example, if the partition values are: + * + * <pre> + * [year, 2017], + * [month,11], + * [day, 10] + * + * The constructed directory path will be + * "[dataLocationRoot]/year=2017/month=11/day=10" + * </pre> + * + * @param database + * the hive database to use for table namespacing + * @param tableName + * the hive table to write to + * @param partition + * the partition within the table it should be written + */ + public static Target table(String database, String tableName, Map<String, String> partition) { + return new HCatTarget(database, tableName, partition); + } +}
